Readable Streams in Node.js
Overview
In my previous post, we learned everything we need to know to understand Node.js streams. We covered the four fundamental stream types in Node.js: Writable, Readable, Duplex, and Transform. We also discussed the benefits of using streams, key concepts like buffering, backpressure, and piping, and the importance of events in stream processing. 在我之前的文章中,我们学习了理解 Node.js 流所需的一切。我们介绍了 Node.js 中的四种基本流类型:Writable、Readable、Duplex 和 Transform。我们还讨论了使用流的好处、缓冲、背压和管道等关键概念,以及事件在流处理中的重要性。
In this post, we will dive deeper into Readable
streams in particular: how they are created, how we can consume data from them, and some a number of examples to get you comfortable.
在这篇文章中,我们将特别深入探讨 Readable 流:它们是如何创建的,我们如何从中消费数据,以及一些示例来让你熟悉。
How to create Readable streams
In our previous post, we started with the mental model that Readable streams are like water taps. In the same way that we can turn on the tap, change the flow rate, or turn it off, a Readable stream provides data in chunks that we can consume, pause, resume, or end. 在我们之前的文章中,我们从 Readable 流就像水龙头的心智模型开始。就像我们可以打开水龙头、更改流速或关闭它一样,Readable 流以块的形式提供数据,我们可以使用这些数据块、暂停、恢复或结束这些数据。
But how can we create a Readable stream in Node.js? There are a number of ways, but let's start with what the node:stream
module provides.
但是我们如何在 Node.js 中创建 Readables 流呢?有很多方法,但让我们从 node:stream
模块开始。
stream.Readable.from()
stream.from()
stream.Readable
stream.fromWeb()
From top-to-bottom, I've listed out what I think are the most approachable ways to create a Readable stream in Node.js from the node:stream
module, so let's go through them each.
从上到下,我列出了我认为在 Node.js node:stream
模块中创建 Readable 流的最可行的方法,所以让我们逐一介绍一下。
stream.Readable.from()
The stream.Readable.from()
method creates a Readable stream from an iterable object or an async iterable object. This method is useful when you have an iterable data source, for example an array of strings, that you want to convert into a stream.
stream.Readable.from()
方法从可迭代对象或异步可迭代对象创建 Readable 流。当您有一个要转换为流的可迭代数据源(例如字符串数组)时,此方法非常有用。
const { Readable } = require("stream");
const readableStream = Readable.from(["Hello", " ", "World", "!"]);
For the sake of completion, here is a reminder of how that Readable stream could be paired with a Writable stream to pipe data from one to the other: 为了完整,这里提醒一下如何将 readable 流与 Writable 流配对以将数据从一个管道传输到另一个:
const { Readable, Writable } = require("node:stream");
const fs = require("node:fs");
// Create a Readable stream from an array of strings
const readableStream = Readable.from(["Hello", " ", "World", "!"]);
// Create a Writable stream (in this case, writing to a file)
const writableStream = fs.createWriteStream("output.txt");
// Pipe the Readable stream to the Writable stream
readableStream.pipe(writableStream);
// Handle the 'finish' event on the Writable stream
writableStream.on("finish", () => {
console.log("Finished writing to file");
});
// Handle any errors
writableStream.on("error", (err) => {
console.error("An error occurred:", err);
});
In the above scenario, our readableStream
is piped to the writeableStream
that writes to a file output.txt
.
在上面的场景中,readableStream
通过管道传输到写入文件 output.txt
的 writeableStream
。
The array of ["Hello", " ", "World", "!"]
here is considered an iterable object with four chunks. Each element of the array is treated as a separate chunk in the resulting stream.
数组 [“Hello”, “ ”, “World”, “!”]
在这里可以被认为是包含四个chunk数据的可迭代对象。数组的每个元素都被视为结果流中的单独块。
The writeableStream
consumes each chunk of data and writes it to the file output.txt
. The finish
event is emitted when all data has been written to the file, and the error event is emitted if an error
occurs during the writing process.
writeableStream
把每个数据chunk写入文件 output.txt
。当所有数据都已写入文件时,将触发 finish
事件,如果在写入过程中发生错误,则会触发 error
事件。
To solidify this, here is a verbose sequence diagram: 为了巩固这一点,下面是一个详细的序列图:
stream.from()
This is function that can be imported directly from the node:stream
module and is a shorthand for stream.Readable.from()
.
可直接从 node:stream
模块导入 from
函数,这个函数是 stream.Readable.from()
简写.
const { from, Readable } = require("node:stream");
// These two lines are equivalent:
const stream1 = from(["Hello", "World"]);
const stream2 = Readable.from(["Hello", "World"]);
stream.Readable()
The stream.Readable
class is a base class for creating custom Readable streams. You can extend this class to create your own Readable streams by implementing the _read()
method.
stream.Readable
类是用于创建自定义 Readable 流的基类。您可以通过实现 _read()
方法扩展此类以创建自己的 Readable 流。
const { Readable } = require("stream");
class MyReadable extends Readable {
constructor(options, data) {
super(options);
this.data = data;
}
_read() {
if (this.data.length) {
this.push(this.data.shift());
} else {
this.push(null);
}
}
}
const myReadable = new MyReadable(["Hello", " ", "World", "!"]);
In the above example, we create a custom Readable stream by extending the stream.Readable
class. The _read()
method is implemented to push data chunks from the data array to the stream. When all data has been pushed, null
is pushed to signal the end of the stream.
在上面的例子中,我们通过扩展 stream.Readable
类来创建一个自定义的只读流。 _read()
方法将数据块从 data
数组推送到流中。当所有数据都被推送后, 推送 null
以发出流结束的信号。
stream.fromWeb()
stream.fromWeb()
is designed to convert a Web API ReadableStream into a Node.js Readable stream. It's not always "from the web" in the sense of network requests; it's about compatibility between Web API streams and Node.js streams.
stream.fromWeb()
旨在将 Web API ReadableStream 转换为 Node.js Readable 流。它并不总是用于字面意思, 用于来自Web的网络请求. 它与 Web API 流和 Node.js 流之间的兼容性有关。
const { fromWeb } = require("node:stream/web");
// Create a Web API ReadableStream (this could come from a fetch() response, for example)
const webReadableStream = new ReadableStream({
start(controller) {
controller.enqueue("Hello");
controller.enqueue("World");
controller.close();
},
});
// Convert it to a Node.js Readable stream
const nodeReadableStream = fromWeb(webReadableStream);
// Use the Node.js stream
nodeReadableStream.on("data", (chunk) => {
console.log(chunk.toString());
});
nodeReadableStream.on("end", () => {
console.log("Stream ended");
});