Hands-on With Node.js Streams: Examples and Approach
Join the DZone community and get the full member experience.
Join For FreeWhat Are Streams?
A stream is an abstract interface that lets you perform specific tasks continuously. A stream is an EventEmitter that implements different methods. A user can use streams to perform a variety of tasks, like read, write, and transform functions.
In a Node.js environment, streams are used to work with streaming data. It provides users with an API that helps them in creating the streaming interface. The data here is received in parts and is read in parts as well.
Piping Streams
As the name suggests, piping is a process that ensures data flows without any hindrance. Piping feeds the output of a stream as an input to another stream and helps maintain a smooth workflow. How long will the piping continue depends on what a user feeds into the system.
Here is how you can execute a piping mechanism:
Begin by creating a js file main.js having the following code:
var fs = require("fs");
// Insert a readable stream
var readerStream = fs.createReadStream('input.txt');
// Insert a writable stream
var writerStream = fs.createWriteStream('output.txt');
// Pipe the read and write operations
// read input.txt and write data to output.txt
readerStream.pipe(writerStream);
console.log("Program Ended");
// After that run the main.js to see the output −
$ node main.js
// Verify if it is what you had expected.
You can use additional logic to allow switching between modes automatically. Using piping, users can create bidirectional streams that can perform various functions. The piping mechanism stops as soon as the end functions run. Users can bypass the end function by inserting an additional optional config object with end function as a Boolean.
Chaining Streams
A process similar to Piping, Chaining is used to perform multiple tasks sequentially. It is a method that connects the output of a stream to another stream and thereby performs multiple operations together. Here is an example of how it works:
Make a js file named main.js with the following code:
xxxxxxxxxx
var fs = require("fs");
var zlib = require('zlib');
// Compress the file Detox.txt to Detox.txt.gz
fs.createReadStream(Detox.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(Detox.txt.gz'));
console.log("File Compressed.");
After adding the code, run the main.js to see the output:
xxxxxxxxxx
$ node main.js
Verify the Outcome.
File has successfully been compressed.
You will find that Detox.txt has now been compressed and it has created a file, Detox.txt.gz in the current directory. You can decompress the same file using the following code:
xxxxxxxxxx
var fs = require("fs");
var zlib = require('zlib');
// Decompress the file input.txt.gz to Detox.txt
fs.createReadStream(Detox.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream(Detox.txt'));
console.log("File Decompressed.");
As we did before, we can run our file to see the output:
xxxxxxxxxx
$ node main.js
Verify the Outcome.
File Decompressed.
Types of Streams
Streams are capable of performing various functions, depending on the category under which they fall. We can divide streams into the following categories:
Readable Streams
A readable stream, as the name suggests, allows users to read data. They come in two variants or two different reading modes, Paused and Flowing. All readable streams run in pause mode by default. This means users have to request to get an output from the stream. The flowing mode ensures that data flows continuously.
The fs.createReadStream
function is used to create a readable stream, or you can read()
continuously until all of the data finishes reading. To make the stream flow, you will need an additional bit of code. Here is an example of a reading stream:
xxxxxxxxxx
var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;
readableStream.on('readable', function() {
while ((chunk=readableStream.read()) != null) {
data += chunk;
}
});
readableStream.on('end', function() {
console.log(data)
});
In the above example, the read()
function will read data from the internal buffer and return it to the user. As soon as there is no data to read further, it will terminate the loop and return null
.
While analyzing the readable stream, we realize that the data
event and the end
event are its most important events. The data event gets emitted whenever the stream sends a pile of data to the user. The end event comes into the scene when there is no more data left to be consumed by the stream.
Writable Streams
Another EventEmitter
, writable streams, allow users to write to a chosen destination. We use the write()
function to initiate a writable stream. The API here is simpler and prefers the use of methods instead of events. It is very easy for users to learn a writable stream. Here is a basic example of writable streams:
xxxxxxxxxx
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
});
The above example is a pretty standard. We are using a readable stream to read the inputs, and then the write()
stream writes it to the designated destination. You will get a Boolean as soon as the function is successful. If the return is true, the process is complete. In case there is any discrepancy, the function will return false.
Two significant events are usually attached to a writable stream – drain and finish events. The drain event is an indicator that the stream is capable of receiving more data. Whereas the finish event signifies that the underlying system has received the entire data.
Duplex Streams
The first two streams are good at performing individual functions. With duplex streams, you can perform both of their functions collectively. It’s almost like the child inheriting the genes of both the mother and the father. Mostly, a duplex stream consists of two individual streams, one of which is for flowing in, and the other is for flowing out. Below is an example of a basic duplex stream:
xxxxxxxxxx
net.createServer(socket => {
socket.pipe(socket)
}).listen(8001);
In the given example, the socket has been piped to itself, which in turn will ensure the creation of a duplex stream. The first socket is a readable stream, whereas the next one is a writable stream. Whenever you run the function, the netcat
will try to send some data to the server. On the other hand, the server will try to write the data received.
Transform Streams
A transform stream is a more complex duplex stream, where the user reads what they are sending off as input. Unlike the duplex stream, the reader here has access to the data that he has entered. It also refers to the fact that the output depends on the input provided to the machine. We use the transformStream
function to create a transform stream.
Here is a simple transform stream example:
xxxxxxxxxx
const readableStream = fs.createReadStream('file');
const transformStream = zlib.createGzip();
const writableStream = fs.createWriteStream('file.gz');
readableStream.pipe(transformStream).pipe(writableStream);
The above transform stream will zip a file when we run it. The zlib
function comes into play when we need output that is either much larger or much smaller than the input. In this case, it has been used to create a smaller output.
Streams Compatibility With Async Generators and Async Iterators
With the help of async generators, we can create a Node.js readable stream. We need to use the Readable.from()
function, as given in the example below:
xxxxxxxxxx
const { Readable } = require('stream');
async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
})
We can also consume a readable stream with async iterators using the async()
function. Here is an example:
xxxxxxxxxx
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
These iterators are used to prevent unhandled post-destroy errors by registering a permanent error handler.
We can also pipe writable streams from async iterators, but we must be careful of backpressure and errors.
Benefits of Streams
Being used worldwide, there have to be some benefits attached to streams. Other than the fact that even a beginner can implement them, here are some other benefits of using streams:
Time Efficiency
What is the benefit of a chain? It ensures that the person at the back is traveling along with the person at the front. Regarding the stream environment, due to piping, output data of a stream is transferred as an input of another stream. It ensures the timely processing of massive data due to constant flowing. Piping allows us to process multiple stages at the same time, thereby reducing unnecessary time wastage.
Spatial Efficiency
What do you do when you have a small buffer but a larger input file? You create a stream to display the data as soon as possible to ensure that the buffer remains free for the next lot. Suppose you want to read a file that is around 35 MB in size and display output. But the buffer is limited to 25 MB. What do you do in such a situation?
To avert the crisis, create a readable stream. What will it do? As soon as a part of data is read from your input, you can get the pile on to the buffer, display it, and clear it off to make space for the next lot. It will ensure that data is not being leaked and is fully processed.
Conclusion
Streams are an integral part of Node.js and have helped to simplify code for the developers. With the help of streams, developers can now build a code in far less time than earlier. With so many other environments available that do the same thing, streams are the reason why most people have stayed on Node.js. This article should have given you a fair idea of what streams are and how they operate.
Published at DZone with permission of Shital Agarwal. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments