Skip to content

Commit

Permalink
+create[Read,Write]Stream -duplexified File
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Oct 16, 2014
1 parent a396eb6 commit d17d6c5
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 428 deletions.
12 changes: 6 additions & 6 deletions lib/storage/bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,12 @@ Bucket.prototype.delete = function(callback) {
* the different use cases you may have.
*
* @param {string} name - The name of the file in this bucket.
* @param {object=} metadata - Metadata to set when writing to this file.
* @return {module:storage/file}
*
* @example
* var file = bucket.file('my-existing-file.png');
*/
Bucket.prototype.file = function(name, metadata) {
return new File(this, name, metadata);
Bucket.prototype.file = function(name) {
return new File(this, name);
};

/**
Expand Down Expand Up @@ -291,16 +289,18 @@ Bucket.prototype.upload = function(localPath, destination, metadata, callback) {
name = path.basename(localPath);
break;
}
metadata = metadata || {};
callback = callback || util.noop;
if (util.is(destination, 'string')) {
name = destination;
}
if (destination instanceof File) {
name = destination.name;
newFile = destination;
}
newFile = newFile || this.file(name, metadata);
newFile = newFile || this.file(name);
fs.createReadStream(localPath)
.pipe(newFile)
.pipe(newFile.createWriteStream(metadata))
.on('error', callback)
.on('complete', function() {
callback(null, newFile);
Expand Down
282 changes: 118 additions & 164 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
'use strict';

var crypto = require('crypto');
var Duplexify = require('duplexify');
var duplexify = require('duplexify');
var extend = require('extend');
var nodeutil = require('util');
var through = require('through2');
var uuid = require('node-uuid');

/**
Expand All @@ -49,60 +47,15 @@ var STORAGE_UPLOAD_BASE_URL = 'https://www.googleapis.com/upload/storage/v1/b';
* extra call to `setMetadata`.
*/
/**
* A File object is created from your Bucket object, using
* {module:storage/bucket#file}. File objects are duplex streams, which means
* they can be read from and written to. No API requests are made until you
* attempt to read or write from the file, by attaching a Readable or Writable
* stream. This means you can create a file on the fly, but you will get an
* error if you try to read from a file in your bucket that does not exist.
*
* For more help with using and understanding Node.js streams, see the
* [Stream documentation]{@link http://nodejs.org/api/stream.html}.
* A File object is created from your Bucket object using
* {module:storage/bucket#file}.
*
* @alias module:storage/file
* @constructor
*
* @example
* //-
* // <h4>Downloading a File</h4>
* //
* // The example below demonstrates how we can reference a remote file, then
* // pipe its contents to a local file. This is effectively creating a local
* // backup of your remote data.
* //-
* var fs = require('fs');
* myBucket.file('image.png')
* .pipe(fs.createWriteStream('/Users/stephen/Photos/image.png'))
* .on('error', function(err) {});
*
* //-
* // <h4>Uploading a File</h4>
* //
* // Now, consider a case where we want to upload a file to your bucket. You
* // have the option of using {module:storage/bucket#upload}, but that is just
* // a convenience method which will do the following.
* //-
* var fs = require('fs');
* fs.createReadStream('/Users/stephen/Photos/birthday-at-the-zoo/panda.jpg')
* .pipe(myBucket.file('panda.jpg'))
* .on('error', function(err) {});
*
* //-
* // <h4>Uploading a File with Metadata</h4>
* //
* // One last case you may run into is when you want to upload a file to your
* // bucket and set its metadata at the same time. Like above, you can use
* // {module:storage/bucket#upload} to do this, which again, is just a wrapper
* // around the following.
* //-
* var fs = require('fs');
* fs.createReadStream('/Users/stephen/Photos/birthday-at-the-zoo/panda.jpg')
* .pipe(myBucket.file('panda.jpg', { contentType: 'image/jpeg' }))
* .on('error', function(err) {});
*/
function File(bucket, name, metadata) {
Duplexify.call(this);

if (!name) {
throw Error('A file name must be specified.');
}
Expand All @@ -114,16 +67,8 @@ function File(bucket, name, metadata) {
enumerable: true,
value: name
});

this.events_ = {
_read: this._read,
_write: this._write
};
this.bindEvents_();
}

nodeutil.inherits(File, Duplexify);

/**
* Copy this file to another file. By default, this will copy the file to the
* same bucket, but you can choose to copy it to another Bucket by providing
Expand All @@ -133,7 +78,6 @@ nodeutil.inherits(File, Duplexify);
*
* @param {string|{module:storage/bucket}|{module:storage/file}} destination -
* Destination file.
* @param {object=} metadata - Destination file metadata object.
* @param {function=} callback - The callback function.
*
* @example
Expand Down Expand Up @@ -230,6 +174,115 @@ File.prototype.copy = function(destination, callback) {
});
};


/**
* Create a readable stream to read the contents of the remote file. It can be
* piped to a writable stream or listened to for 'data' events to read a file's
* contents.
*
* @example
* //-
* // <h4>Downloading a File</h4>
* //
* // The example below demonstrates how we can reference a remote file, then
* // pipe its contents to a local file. This is effectively creating a local
* // backup of your remote data.
* //-
* var fs = require('fs');
* var image = myBucket.file('image.png');
*
* image.createReadStream()
* .pipe(fs.createWriteStream('/Users/stephen/Photos/image.png'))
* .on('error', function(err) {});
*/
File.prototype.createReadStream = function() {
var bucket = this.bucket;
var dup = duplexify();
function createAuthorizedReq(uri) {
bucket.connection_.createAuthorizedReq({ uri: uri }, function(err, req) {
if (err) {
dup.emit('error', err);
return;
}
dup.setReadable(bucket.connection_.requester(req));
});
}
if (this.metadata.mediaLink) {
createAuthorizedReq(this.metadata.mediaLink);
} else {
this.getMetadata(function(err, metadata) {
if (err) {
dup.emit('error', err);
return;
}
createAuthorizedReq(metadata.mediaLink);
});
}
return dup;
};

/**
* Create a writable stream to overwrite the contents of the file in your
* bucket.
*
* A File object can also be used to create files for the first time.
*
* @param {object=} metadata - Set the metadata for this file.
*
* @example
* //-
* // <h4>Uploading a File</h4>
* //
* // Now, consider a case where we want to upload a file to your bucket. You
* // have the option of using {module:storage/bucket#upload}, but that is just
* // a convenience method which will do the following.
* //-
* var fs = require('fs');
* var image = myBucket.file('image.png');
*
* fs.createReadStream('/Users/stephen/Photos/birthday-at-the-zoo/panda.jpg')
* .pipe(image.createWriteStream())
* .on('error', function(err) {});
*
* //-
* // <h4>Uploading a File with Metadata</h4>
* //
* // One last case you may run into is when you want to upload a file to your
* // bucket and set its metadata at the same time. Like above, you can use
* // {module:storage/bucket#upload} to do this, which is just a wrapper around
* // the following.
* //-
* var fs = require('fs');
* var image = myBucket.file('image.png');
*
* fs.createReadStream('/Users/stephen/Photos/birthday-at-the-zoo/panda.jpg')
* .pipe(image.createWriteStream({ contentType: 'image/jpeg' }))
* .on('error', function(err) {});
*/
File.prototype.createWriteStream = function(metadata) {
var that = this;
var dup = duplexify();
this.getWritableStream_(metadata, function(err, writable) {
if (err) {
dup.emit('error', err);
return;
}
writable.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
if (err) {
dup.emit('error', err);
return;
}
that.metadata = data;
dup.emit('complete', data);
});
});
dup.setWritable(writable);
dup.pipe(writable);
});
return dup;
};

/**
* Delete the file.
*
Expand All @@ -245,7 +298,6 @@ File.prototype.delete = function(callback) {
callback(err);
return;
}
this.removeAllListeners();
callback();
}.bind(this));
};
Expand Down Expand Up @@ -378,61 +430,19 @@ File.prototype.setMetadata = function(metadata, callback) {
* out.
*/

/**
* Set up and tear down listeners for a new stream. First, overwrite the _read
* and _write methods for the first time a stream is accessed. This causes us to
* fetch an authorized connection before opening the pipe for more data. The
* data is not buffered into memory, rather, it ceases pulling until we say so.
*
* This method is called after a complete event, when we also destroy the
* Duplexify stream and re-apply its constructor to our file instance.
*
* @private
*/
File.prototype.bindEvents_ = function() {
var that = this;

var _read = this.events_._read;
this._read = function() {
that._read = _read.bind(that);
that.setReadableStream_();
_read.apply(that, util.toArray(arguments));
};

var _write = this.events_._write;
this._write = function() {
that._write = _write.bind(that);
that.setWritableStream_();
_write.apply(that, util.toArray(arguments));
};

this.on('error', function() {
that.emit('end');
});

this.on('complete', function() {
that.emit('end');
});

this.on('end', function() {
that.removeAllListeners();
that.destroy();
Duplexify.call(that);
that.bindEvents_();
});
};

/**
* Get a remote stream to begin piping a readable stream to.
*
* @private
*/
File.prototype.getWritableStream_ = function(callback) {
File.prototype.getWritableStream_ = function(metadata, callback) {
if (!callback) {
callback = metadata;
metadata = {};
}
var that = this;
var boundary = uuid.v4();
var metadata = extend({
contentType: 'text/plain'
}, this.metadata);
metadata = extend({ contentType: 'text/plain' }, metadata);
this.bucket.connection_.createAuthorizedReq({
method: 'POST',
uri: util.format('{base}/{bucket}/o', {
Expand Down Expand Up @@ -469,60 +479,4 @@ File.prototype.getWritableStream_ = function(callback) {
});
};

/**
* Set the readable stream for Duplexify after making an authorized connection.
*
* @private
*/
File.prototype.setReadableStream_ = function() {
var that = this;
var bucket = this.bucket;
function createAuthorizedReq(uri) {
bucket.connection_.createAuthorizedReq({ uri: uri }, function(err, req) {
if (err) {
that.emit('error', err);
return;
}
that.setReadable(bucket.connection_.requester(req));
});
}
if (this.metadata.mediaLink) {
createAuthorizedReq(this.metadata.mediaLink);
} else {
this.getMetadata(function(err, metadata) {
if (err) {
that.emit('error', err);
return;
}
createAuthorizedReq(metadata.mediaLink);
});
}
};

/**
* Set the writable stream for Duplexify after making an authorized connection.
*
* @private
*/
File.prototype.setWritableStream_ = function() {
var that = this;
this.getWritableStream_(function(err, writable) {
if (err) {
that.emit('error', err);
return;
}
writable.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
if (err) {
that.emit('error', err);
return;
}
that.emit('complete', data);
});
});
that.setWritable(writable);
through().pipe(writable);
});
};

module.exports = File;
Loading

0 comments on commit d17d6c5

Please sign in to comment.