Skip to content

Commit

Permalink
storage: support resumable uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 12, 2014
1 parent 1cc2031 commit 10cbd2d
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 18 deletions.
4 changes: 3 additions & 1 deletion lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ function ApiError(errorBody) {
this.errors = errorBody.errors;
this.code = errorBody.code;
this.message = errorBody.message;
this.response = errorBody.response;
}

util.inherits(ApiError, Error);
Expand Down Expand Up @@ -180,7 +181,8 @@ function handleResp(err, resp, body, callback) {
callback(new ApiError({
errors: [],
code: resp.statusCode,
message: body || 'Error during request.'
message: body || 'Error during request.',
response: resp
}));
return;
}
Expand Down
269 changes: 252 additions & 17 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

'use strict';

var bufferEqual = require('buffer-equal');
var ConfigStore = require('configstore');
var crypto = require('crypto');
var duplexify = require('duplexify');
var request = require('request');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type module:common/util
Expand Down Expand Up @@ -224,6 +227,36 @@ File.prototype.createReadStream = function() {
return dup;
};

/*! Developer Documentation
*
* `createWriteStream` uses the Resumable Upload API: http://goo.gl/jb0e9D.
*
* The process involves these steps:
*
* 1. POST the file's metadata. We get a resumable upload URI back, then cache
* it with ConfigStore.
* 2. PUT data to that URI with a Content-Range header noting what position
* the data is beginning from. We also cache, at most, the first 16 bytes
* of the data being uploaded.
* 3. Delete the ConfigStore cache after the upload completes.
*
* If the initial upload operation is interrupted, the next time the user
* uploads the file, these steps occur:
*
* 1. Detect the presence of a cached URI in ConfigStore.
* 2. Make an empty PUT request to that URI to get the last byte written to
* the remote file.
* 3. PUT data to the URI starting from the first byte after the last byte
* returned from the call above.
*
* If the user tries to upload entirely different data to the remote file:
*
* 1. -- same as above --
* 2. -- same as above --
* 3. -- same as above --
* 4. Compare the first chunk of the new data with the chunk in cache. If it's
* different, start a new resumable upload (Step 1 of the first example).
*/
/**
* Create a writable stream to overwrite the contents of the file in your
* bucket.
Expand Down Expand Up @@ -264,30 +297,232 @@ File.prototype.createReadStream = function() {
*/
File.prototype.createWriteStream = function(metadata) {
var that = this;
var configStore = new ConfigStore('gcloud-node');
var makeAuthorizedRequest = that.bucket.storage.makeAuthorizedRequest_;
metadata = metadata || {};

var lastByteWritten;
var resumableUri;
var RETRY_LIMIT = 3;
var retries = 0;

// This is used to hold all data coming in from the user's readable stream. If
// we need to abort a resumable upload to start a new one, this will hold the
// data until we're ready again.
var bufferStream = through();

// This is the stream returned to the user.
var dup = streamEvents(duplexify());

// Wait until we've received data to determine if we're resuming an upload or
// creating a new one.
dup.once('writing', function() {
util.makeWritableStream(dup, {
makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_,
metadata: metadata,
request: {
qs: {
name: that.name
},
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
})
}
}, function(data) {
that.metadata = data;
var config = configStore.get(that.name);

dup.emit('complete', data);
dup.end();
});
if (config && config.uri) {
resumableUri = config.uri;
resumeUpload();
} else {
startUpload();
}
});

return dup;

// Begin a new resumable upload. Send the metadata and cache the URI returned.
function startUpload() {
var headers = {};

if (metadata.contentType) {
headers['X-Upload-Content-Type'] = metadata.contentType;
}

makeAuthorizedRequest({
method: 'POST',
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: encodeURIComponent(that.bucket.name)
}),
qs: {
name: that.name,
uploadType: 'resumable'
},
headers: headers,
json: metadata
}, function(err, res, body) {
if (err) {
handleError(err);
return;
}

lastByteWritten = -1;
resumableUri = body.headers.location;

configStore.set(that.name, {
uri: resumableUri
});

resumeUpload();
});
}

// Given a byte offset, create an authorized request to the resumable URI. If
// resuming an upload, we first determine the last byte written, then create
// the authorized request.
function resumeUpload() {
if (util.is(lastByteWritten, 'number')) {
createUploadRequest(lastByteWritten);
} else {
getLastByteWritten(createUploadRequest);
}

function createUploadRequest(offset) {
makeAuthorizedRequest({
method: 'PUT',
uri: resumableUri
}, {
onAuthorized: function (err, reqOpts) {
if (err) {
handleError(err);
return;
}

sendFile(reqOpts, offset + 1);
}
});
}
}

// Given an authorized request and a byte offset, begin sending data to the
// resumable URI from where the upload last left off.
function sendFile(reqOpts, offset) {
reqOpts.headers['Content-Range'] = 'bytes ' + offset + '-*/*';

var bytesWritten = 0;

var offsetStream = through(function(chunk, enc, next) {
// Determine if this is the same content uploaded previously. We do this
// by caching a slice of the first chunk, then comparing it with the first
// byte of incoming data.
if (bytesWritten === 0) {
var cachedFirstChunk = configStore.get(that.name).firstChunk;
var firstChunk = chunk.slice(0, 16);

if (!cachedFirstChunk) {
// This is a new upload. Cache the first chunk.
configStore.set(that.name, {
uri: reqOpts.uri,
firstChunk: firstChunk
});
} else {
// This is a continuation of an upload. Make sure the first bytes are
// the same.
cachedFirstChunk = new Buffer(cachedFirstChunk);
firstChunk = new Buffer(firstChunk);

if (!bufferEqual(cachedFirstChunk, firstChunk)) {
// The data being uploaded now is different than the original data.
// Give the chunk back to the stream and create a new upload stream.
bufferStream.unshift(chunk);
bufferStream.unpipe(this);

configStore.del(that.name);

startUpload();
return;
}
}
}

var length = chunk.length;

if (util.is(chunk, 'string')) {
length = Buffer.byteLength(chunk.length, enc);
}

if (bytesWritten < offset) {
chunk = chunk.slice(offset - bytesWritten);
}

bytesWritten += length;

// Only push data to the request stream from the byte after the one we
// left off on.
if (bytesWritten > offset) {
this.push(chunk);
}

next();
});

var writeStream = request(reqOpts);
writeStream.callback = util.noop;

writeStream.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
if (err) {
handleError(err);
return;
}

that.metadata = data;
dup.emit('complete', that.metadata);

configStore.del(that.name);
});
});

bufferStream.pipe(offsetStream).pipe(writeStream);
dup.setWritable(bufferStream);
}

// If an upload to this file has previously started, this will return the last
// byte written to it.
function getLastByteWritten(callback) {
makeAuthorizedRequest({
method: 'PUT',
uri: resumableUri,
headers: {
'Content-Length': 0,
'Content-Range': 'bytes */*'
}
}, function(err) {
var RESUME_INCOMPLETE_STATUS = 308;

if (err && err.code === RESUME_INCOMPLETE_STATUS) {
// headers.range format: ##-## (e.g. 0-4915200)
if (err.response.headers.range) {
callback(parseInt(err.response.headers.range.split('-')[1]));
return;
}
}

// Start from the first byte.
callback(-1);
});
}

// Handle an error from API calls following the recommended best practices:
// http://goo.gl/AajKku
function handleError(err) {
if (err.code === 404) {
startUpload();
return;
}

if (err.code > 499 && err.code < 600 && retries <= RETRY_LIMIT) {
retries++;

// Reset `lastByteWritten` so we update this value by pinging the API.
lastByteWritten = null;

resumeUpload();
return;
}

dup.emit('error', err);
dup.end();
}
};

/**
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
"google storage"
],
"dependencies": {
"buffer-equal": "0.0.1",
"configstore": "^0.3.1",
"duplexify": "^3.1.2",
"extend": "^1.3.0",
"google-service-account": "^1.0.0",
Expand Down

0 comments on commit 10cbd2d

Please sign in to comment.