Uploading and Downloading Files: Streaming in Node.js
In this post, you'll learn how to stream files between clients, Node.js, and Oracle Database.
Join the DZone community and get the full member experience.
Join For FreeWhile the buffer APIs are easier to use to upload and download files, the streaming APIs are a great way to better manage memory and concurrency. In this post, you'll learn how to stream files between clients, Node.js, and Oracle Database.
Overview
The streaming APIs in Node.js are designed to leverage and simplify its evented nature. There are four different stream classes: Readable, Writeable, Transform, and Duplex. Readable streams (which includes Transform and Duplex) have a method that can be thought of like the pipe command in Linux, where the standard output of one command can be piped into the standard input of another command. Here's a command line example that takes the output from the ls
(list) command and pipes it through the grep
(search) command to show files that have the word "oracle" in the name:
ls | grep oracle
With Node.js, the syntax is a bit different. Instead of the pipe
command, the pipe
method of a readable stream is invoked and passed a writeable stream:
readableStream.pipe(writeableStream);
The pipe
method returns the destination stream, which allows multiple calls to be chained together as a pipeline. Here's an example:
fileReadable.pipe(gzipTransform).pipe(encryptTransform).pipe(fileWriteable);
The pipe method includes built-in backpressure support. If the consumer is unable to consume data as fast as the producer can produce it, then the producer will be paused until the consumer catches up.
Error handling can be a little tricky with streams. By default, errors are not forwarded upstream with pipe
because it would be difficult to determine where the error occurred. Errors can be forwarded manually or the pipeline
method of the stream module can be used to automate error forwarding and cleanup.
Uploading Files With Node.js
Controller Logic
As mentioned in the post on buffering, the request object passed to the request listener is a readable stream. To validate the file size, I create a FileValidator
class based on the Transform
class and then pipe the request stream into a new instance of the FileValidator
class. The FileValidator
instance is then passed to the database API for processing.
const { Transform } = require('stream');
// Create a new transform stream class that can validate files.
class FileValidator extends Transform {
constructor(options) {
super(options.streamOptions);
this.maxFileSize = options.maxFileSize;
this.totalBytesInBuffer = 0;
}
_transform (chunk, encoding, callback) {
this.totalBytesInBuffer += chunk.length;
// Look to see if the file size is too large.
if (this.totalBytesInBuffer > this.maxFileSize) {
const err = new Error(`The file size exceeded the limit of ${this.maxFileSize} bytes`);
err.code = 'MAXFILESIZEEXCEEDED';
callback(err);
return;
}
this.push(chunk);
callback(null);
}
_flush (done) {
done();
}
}
async function post(req, res, next) {
try {
// Get a new instance of the transform stream class.
const fileValidator = new FileValidator({
maxFileSize: 1024 * 1024 * 50 // 50 MB
});
let contentType = req.headers['content-type'] || 'application/octet';
let fileName = req.headers['x-file-name'];
if (fileName === '') {
res.status(400).json({error: 'The file name must be passed in the via x-file-name header'});
return;
}
// Pipe the request stream into the transform stream.
req.pipe(fileValidator);
// Could happen if the client cancels the upload. Forward upstream as an error.
req.on('aborted', function() {
fileValidator.emit('error', new Error('Upload aborted.'));
});
try {
const fileId = await files.create(fileName, contentType, fileValidator);
res.status(201).json({fileId: fileId});
} catch (err) {
console.error(err);
res.header('Connection', 'close');
if (err.code === 'MAXFILESIZEEXCEEDED') {
res.status(413).json({error: err.message});
} else {
res.status(500).json({error: 'Oops, something broke!'});
}
req.connection.destroy();
}
} catch (err) {
next(err);
}
}
Overview:
- Lines 1-31: The
Transform
class is required in from the stream module and used to create a newFileValidator
class. TheFileValidator
class tracks the number of bytes streaming through it and raises an error if the total exceeds itsmaxFileSize
property. - Lines 36-48: A number of variables are declared, including an instance of the
FileValidator
class. The readable request stream is then piped into theFileValidator
instance. - Lines 51-53: A listener is added to the aborted event of the request stream. If the event is triggered, then no further processing is needed by the database API, so an error is manually triggered on the
FileValidator
instance. - Lines 55-71: The
FileValidator
instance is passed to the database API'screate
method. If successful, the id of the file is returned to the client, otherwise an error is returned.
Database Logic
Once the controller logic has passed along the readable stream and related metadata, the database logic can begin to stream in the file contents. To start, an empty blob is inserted into the database and a pointer to it is returned as an out bind. The pointer is an instance of the Lob
class of node-oracledb.
The Lob
class is implemented as a Duplex stream, so it supports both reading and writing depending on the context. The readable stream is piped into the Lob
instance, and when the file finishes streaming in the transaction is committed.
const createSql =
`insert into jsao_files (
file_name,
content_type,
blob_data
) values (
:file_name,
:content_type,
empty_blob()
) returning
id,
blob_data
into :id,
:blob_data`;
async function create(fileName, contentType, contentStream) {
return new Promise(async (resolve, reject) => {
let conn;
try {
conn = await oracledb.getConnection();
let result = await conn.execute(
createSql,
{
file_name: fileName,
content_type: contentType,
id: {
type: oracledb.NUMBER,
dir: oracledb.BIND_OUT
},
blob_data: {
type: oracledb.BLOB,
dir: oracledb.BIND_OUT
}
}
);
const lob = result.outBinds.blob_data[0];
contentStream.pipe(lob);
contentStream.on('error', err => {
// Forward error along to handler on lob instance
lob.emit('error', err);
});
lob.on('error', async err => {
try {
await conn.close();
} catch (err) {
console.error(err);
}
reject(err);
});
lob.on('finish', async () => {
try {
await conn.commit();
resolve(result.outBinds.id[0]);
} catch (err) {
console.log(err);
reject(err);
} finally {
try {
await conn.close();
} catch (err) {
console.error(err);
}
}
});
} catch (err) {
reject(err);
}
});
}
Overview:
- Lines 1-14: A SQL insert statement is declared. Within it, an empty blob is inserted into the
blob_data
column and returned out via the returning clause and the bind variable named:content_buffer
. - Lines 18-41: The streaming implementation requires a database connection to stay open while the data is being streamed. A connection is obtained from the connection pool and the SQL statement is executed. The
Lob
instance is obtained from the out binds on theresult
object and thencontentStream
is piped into it. - Lines 43-46: Errors triggered on the
contentStream
instance are triggered on theLob
instance to create a single error handling code path. - Lines 48-56: If an error occurs on the
Lob
instance, the connection to the database is closed — this will rollback the transaction automatically. - Lines 58-72: When the finish event is triggered on the
Lob
instance, the transaction is commited and the id of the row is returned to the controller logic.
Downloading Files With Node.js
Controller Logic
When downloading a file from the database, the controller logic will be passed an instance of the Lob
class from the database API. In this case, the Lob
instance is a readable stream, so its output can be piped into the response
object which will stream the data to the client.
async function get(req, res, next) {
try {
let aborted = false;
let row;
const id = parseInt(req.params.id, 10);
if (isNaN(id)) {
res.status(400).json({error: 'Missing or invalid file id'});
return;
}
// Could happen if the client cancels the download. Forward upstream as an error.
req.on('aborted', function() {
aborted = true;
if (row) {
row.blob_data.emit('error', new Error('Download aborted.'));
}
});
row = await files.get(id);
// It's possible the aborted event happened before the readable stream was
// obtained. Reemit the event to handle the error.
if (aborted) {
row.blob_data.emit('aborted');
}
if (row) {
res.status(200);
res.set({
'Cache-Control': 'no-cache',
'Content-Type': row.content_type,
'Content-Length': row.file_length,
'Content-Disposition': 'attachment; filename=' + row.file_name
});
row.blob_data.pipe(res);
} else {
res.status(404).end();
}
} catch (err) {
next(err);
}
}
Overview:
- Lines 3-21: The id of the file to be downloaded is parsed from the request parameters and passed along to the
get
method of the database API to fetch the row with the readable stream and related metadata. A listener is added to the aborted event before the requesting row because that's an async operation and the event could happen while waiting for the row to be returned. - Lines 31-40: If the database API successfully returns a row, then the HTTP response status is set to 200 OK. Next, the HTTP headers (including metadata related to the file) are added to the response. Smart clients, such as browsers, know how to use such headers when downloading files. Finally, the file contents are streamed to the client by piping the
Lob
stream to theresponse
object.
Database Logic
When it comes to fetching the file content back out of the database, the default type for a BLOB column is the Lob
/readable stream, so no code needs to be added for that. The trick is to wait until the streaming is done before closing the connection to the database.
const getSql =
`select file_name "file_name",
dbms_lob.getlength(blob_data) "file_length",
content_type "content_type",
blob_data "blob_data"
from jsao_files
where id = :id`;
async function get(id) {
const binds = {
id: id
};
let conn;
try {
conn = await oracledb.getConnection();
const result = await conn.execute(getSql, binds, {outFormat: oracledb.OBJECT});
if (result.rows[0]) {
result.rows[0].blob_data.on('close', async () => {
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
result.rows[0].blob_data.on('error', (err) => {
// destory will trigger a 'close' event when it's done
result.rows[0].blob_data.destroy();
});
}
return result.rows[0];
} catch (err) {
console.log(err);
if (conn) {
try {
await conn.close();
} catch (err) {
console.log(err);
}
}
}
}
Overview:
- Lines 1-7: A SQL statement to fetch the file data is declared.
- Lines 10-36: A variable is declared for the bind definitions (no execute options are needed as the default return type of
Lob
is what is needed). The query is executed and an event listener is added to theclose
event of theLob
to close the to the database connection. An additional listener is added to theerror
event to ensure connections are closed when errors occur. Finally, the fetched row is returned to the controller logic.
As you can see, implementing file upload and download capabilities using the streaming APIs is a little more complex than with the buffer APIs. Most of the complexity is based around the separation of concerns (controller vs. database logic) and figuring out when connections should be closed and how errors should be handled. However, if you want to work with very large files (over 1 GB) and optimize memory utilization within Node.js, then these APIs are the key.
Published at DZone with permission of Dan McGhan, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments