Streams API
Streams API 是为了解决一个简单但又基础的问题而生的:Web 应用如何消费有序的小信息块而不是大块信息?这种能力主要有两种应用场景。
大块数据可能不会一次性都可用。网络请求的响应就是一个典型的例子。网络负载是以连续信息包形式交付的,而流式处理可以让应用在数据一到达就能使用,而不必等到所有数据都加载完毕。
大块数据可 能需要分小部分处理。视频处理、数据压缩、图像编码和 JSON 解析都是可以分成小部分进行处理,而不必等到所有数据都在内存中时再处理的例子。
理解流
提到流,可以把数据想像成某种通过管道输送的液体。JavaScript 中的流借用了管道相关的概念,因为原理是相通的。根据规范,“这些 API 实际是为映射低级 I/O 原语而设计,包括适当时候对字节流的规范化”。Stream API 直接解决的问题是处理网络请求和读写磁盘。
Stream API 定义了三种流。
-
可读流:可以通过某个公共接口读取数据块的流。数据在内部从底层源进入流,然后由消费者(consumer)进行处理。
-
可写流: 可以通过某个公共接口写入数据块的流。生产者(producer)将数据写入流,数据在内部传入底层数据槽(sink)。
-
转换流: 由两种流组成,可写流用于接收数据(可写端),可读流用于输出数据(可读端)。这两个流之间是转换程序(transformer),可以根据需要检查和修改流内容。
块、内部队列和反压
流的基本单位是块(chunk)。块可是任意数据类型,但通常是定型数组。每个块都是离散的流片段,可以作为一个整体来处理。更重要的是,块不是固定大小的,也不一定按固定间隔到达。在理想的流当中,块的大小通常近似相同,到达间隔也近似相等。不过好的流实现需要考虑边界情况。
前面提到的各种类型的流都有入口和出口的概念。有时候,由于数据进出速率不同,可能会出现不匹配的情况。为此流平衡可能出现如下三种情形。
- 流出口处理数据的速度比入口提供数据的速度快。流出口经常空闲(可能意味着流入口效率较 26 低),但只会浪费一点内存或计算资源,因此这种流的不平衡是可以接受的。
- 流入和流出均衡。这是理想状态。
- 流入口提供数据的速度比出口处理数据的速度快。这种流不平衡是固有的问题。此时一定会在某个地方出现数据积压,流必须相应做出处理。
流不平衡是常见问题,但流也提供了解决这个问题的工具。所有流都会为已进入流但尚未离开流的块提供一个内部队列。对于均衡流,这个内部队列中会有零个或少量排队的块,因为流出口块出列的速度与流入口块入列的速度近似相等。这种流的内部队列所占用的内存相对比较小。
如果块入列速度快于出列速度,则内部队列会不断增大。流不能允许其内部队列无限增大,因此它会使用反压(backpressure)通知流入口停止发送数据,直到队列大小降到某个既定的阈值之下。这个阈值由排列策略决定,这个策略定义了内部队列可以占用的最大内存,即高水位线(high water mark)。
可读流
可读流是对底层数据源的封装。底层数据源可以将数据填充到流中,允许消费者通过流的公共接口读取数据。
ReadableStream
ReadableStream()
构造函数创建并从给定的处理程序返回一个可读的流对象。
/**
* underlyingSource 可选
* queuingStrategy 可选
*/
new ReadableStream();
new ReadableStream(underlyingSource);
new ReadableStream(underlyingSource, queuingStrategy);
underlyingSource
一个包含定义了构造流行为方法和属性的对象。如下列表出方法和属性:
-
start(controller)
这是一个当对象被构造时立刻调用的方法。此方法的内容由开发人员定义,并应着眼于访问流,并执行其他任何必需的设置流功能。如果这个过程是异步完成的,它可以返回一个 promise,表明成功或失败。传递给这个方法的 controller 是一个 ReadableStreamDefaultController 或 ReadableByteStreamController,具体取决于 type 属性的值。开发人员可以使用此方法在设立期间控制流。
-
pull(controller)
这个方法,也是由开发人员定义的,当流的内部队列不满时,会重复调用这个方法,直到队列补满。如果 pull() 返回一个 promise,那么它将不会再被调用,直到 promise 完成;如果 promise 失败,该流将会出现错误。传递给此 方法的 controller 参数是 ReadableStreamDefaultController 或 ReadableByteStreamController,具体取决于 type 属性的值。由于更多的块被获取,这个方法可以被开发人员用来控制流。
-
cancel(reason)
如果应用程序表示该流将被取消(例如,调用了 ReadableStream.cancel(),则将调用此方法,该方法也由开发人员定义。该方法应该做任何必要的事情来释放对流的访问。如果这个过程是异步的,它可以返回一个 promise,表明成功或失败。原因参数包含一个 DOMString,它描述了流被取消的原因。
-
type
该属性控制正在处理的可读类型的流。如果它包含一个设置为 bytes 的值,则传递的控制器对象将是一个 ReadableByteStreamController,能够处理 BYOB(带你自己的缓冲区)/字节流。如果未包含,则传递的控制器将为 ReadableStreamDefaultController。
-
autoAllocateChunkSize
对于字节流,开发人员可以使用正整数值设置 autoAllocateChunkSize 以打开流的自动分配功能。启用此功能后,流实现将自动分配一个具有给定整数大小的 ArrayBuffer,并调用底层源代码,就好像消费者正在使用 BYOB reader 一样。
queuingStrategy
一个可选择定义流的队列策略的对象。这需要两个参数:
-
highWaterMark
非负整数 - 这定义了在应用背压之前可以包含在内部队列中的块的总数。
-
size(chunk)
包含参数 chunk 的方法——这表示每个分块使用的大小(以字节为单位)。
const stream = new ReadableStream({
start(controller) {
interval = setInterval(() => {
let string = randomChars();
// Add the string to the stream
controller.enqueue(string);
// show it on the screen
let listItem = document.createElement('li');
listItem.textContent = string;
list1.appendChild(listItem);
}, 1000);
button.addEventListener('click', function () {
clearInterval(interval);
fetchStream();
controller.close();
});
},
pull(controller) {
// We don't really need a pull in this example
},
cancel() {
// This is called if the reader cancels,
// so we should stop generating strings
clearInterval(interval);
},
});
ReadableStreamDefaultController
ReadableStreamDefaultController
是 Streams API 中的一部分,用于控制 ReadableStream
的行为。ReadableStream
表示可从中读取数据的源,而 ReadableStreamDefaultController
提供了一些方法和属性,用于管理和控制这个流的生成和传输。
ReadableStreamDefaultController
用于控制一个 ReadableStream
实例的内部行为。它主要用于自定义流的实现,例如当你使用 new ReadableStream
构造一个新的流时,可以使用它来管理流的生产和消耗。
主要方法和属性
-
enqueue(chunk)
将一个新的数据块加入到流的队列中。如果消费者(读取流的代码)还没有消耗数据,则会被添加到队列中等待消费。
-
close()
关闭流,表示不再有更多的数据块。一旦调用了
close()
方法,流将进入结束状态,消费者不会再收到新的数据块。 -
error(e)
将流置于错误状态,并将错误 e 传递给消费者。一旦调用了
error(e)
方法,流将无法再正常操作。
来看下面的生成器,它每 1000 毫秒就会生成一个递增的整数:
async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}
这个生成器的值可以通过可读流的控制器传入可读流。访问这个控制器最简单的方式就是创建 ReadableStream 的一个实例,并在这个构造函数的 underlyingSource
参数(第一个参数)中定义 start()
方法,然后在这个方法中使用作为参数传入的 controller
。默认情况下,这个控制器参数是 ReadableStreamDefaultController
的一个实例:
const readableStream = new ReadableStream({
start(controller) {
console.log(controller); // ReadableStreamDefaultController {}
},
});
调用控制器的 enqueue()
方法可以把值传入控制器。所有值都传完之后,调用 close()
关闭流:
async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}
const readableStream = new ReadableStream({
async start(controller) {
for await (let chunk of ints()) {
controller.enqueue(chunk);
}
controller.close();
},
});
ReadableStreamDefaultReader
ReadableStreamDefaultReader
允许你从 ReadableStream
中读取数据。它是通过调用 ReadableStream
实例的 getReader
方法来创建的。
主要方法和属性
-
read()
异步方法,从流中读取下一个数据块。返回一个 Promise,该 Promise 解析为一个对象,包含 value(数据块)和 done(布尔值,表示流是否已结束)。
-
releaseLock()
释放读取器对流的锁定,使其他读取器可以锁定该流。释放锁定后,不能再通过该读取器读取数据。
-
closed()
一个 Promise,当流关闭或发生错误时,该 Promise 解析或拒绝。
前面的例子把 5 个值加入了流的队列,但没有把它们从队列中读出来。为此,需要一个 ReadableStreamDefaultReader
的实例,该实例可以通过流的 getReader()
方法获取。调用这个方法会获得流的锁,保证只有这个读取器可以从流中读取值:
async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}
const readableStream = new ReadableStream({
async start(controller) {
for await (let chunk of ints()) {
controller.enqueue(chunk);
}
controller.close();
},
});
console.log(readableStream.locked); // false
const readableStreamDefaultReader = readableStream.getReader();
console.log(readableStream.locked); // true
消费者使用这个读取器实例的 read()
方法可以读出值:
(async function () {
while (true) {
const { done, value } = await readableStreamDefaultReader.read();
if (done) {
break;
} else {
console.log(value);
}
}
})();
// 0
// 1
// 2
// 3
// 4
可写流
可写流是底层数据槽的封装。底层数据槽处理通过流的公共接口写入的数据。
WritableStream
WritableStream()
构造函数创建一个新的 WritableStream 对象实例。
new WritableStream();
new WritableStream(underlyingSink);
new WritableStream(underlyingSink, queuingStrategy);
underlyingSink
一个包含方法和属性的对象,这些方法和属性定义了构造的流的实例的具体行为。underlyingSource
可以包括:
start(controller)
write(chunk, controller)
close(controller)
abort(reason)
queuingStrategy
一个可选的定义流的队列策略的对象。这需要两个参数:
highWaterMark
size(chunk)
来看下面的生成器,它每 1000 毫秒就会生成一个递增的整数:
async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}
这些值通过可写流的公共接口可以写入流。在传给 WritableStream 构造函数的 underlyingSink
参数中,通过实现 write()
方法可以获得写入的数据:
const readableStream = new ReadableStream({
write(value) {
console.log(value);
},
});
WritableStreamDefaultWriter
要把获得的数据写入流,可以通过流的 getWriter()
方法获取 WritableStreamDefaultWriter 的实例。这样会获得流的锁,确保只有一个写入器可以向流中写入数据:
async function* ints() {
// 每 1000 毫秒生成一个递增的 整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}
const writableStream = new WritableStream({
write(value) {
console.log(value);
},
});
console.log(writableStream.locked); // false
const writableStreamDefaultWriter = writableStream.getWriter();
console.log(writableStream.locked); // true
在向流中写入数据前,生产者必须确保写入器可以接收值。writableStreamDefaultWriter.ready
返回一个期约,此期约会在能够向流中写入数据时解决。然后,就可以把值传给 writableStreamDefaultWriter.write()
方法。写入数据之后,调用 writableStreamDefaultWriter.close()
将流关闭:
// 生产者
(async function () {
for await (let chunk of ints()) {
await writableStreamDefaultWriter.ready;
writableStreamDefaultWriter.write(chunk);
}
writableStreamDefaultWriter.close();
})();
转换流
转换流用于组合可读流和可写流。数据块在两个流之间的转换是通过 transform()
方法完成的。
TransformStream
TransformStream()
构造函数创建一个新的 TransformStream 对象,该对象表示一对流:一个 WritableStream 表示可写端,和一个 ReadableStream 表示可读端。
new TransformStream();
new TransformStream(transformer);
new TransformStream(transformer, writableStrategy);
new TransformStream(transformer, writableStrategy, readableStrategy);
transformer
一个表示 transformer 的对象。如果未提供,则生成的流将是一个恒等变换流,它将所有写入可写端的分块转发到可读端,不会有任何改变。
transformer
对象可以包含以下任何方法。每个方法的 controller 都是一个 TransformStreamDefaultController 实例。
-
start(controller)
当 TransformStream 被构造时调用。它通常用于使用 TransformStreamDefaultController.enqueue() 对分块进行排队。
-
transform(chunk, controller)
当一个写入可写端的分块准备好转换时调用,并且执行转换流的工作。如果没有提供
transform()
方法,则使用恒等变换,并且分块将在没有更改的情况下排队。 -
flush(controller)
当所有写入可写端的分块成功转换后被调用,并且可写端将会关闭。
writableStrategy
和 readableStrategy
对象一样,定义了队列策略的可选对象。它需要两个参数 :
-
highWaterMark
一个非负整数。它定义了在应用背压之前内部队列包含的分块的总数。
-
size(chunk)
一个包含参数 chunk 的方法。它表示用于每一个块的大小,以字节为单位。
async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}
const { writable, readable } = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2);
},
});
向转换流的组件流(可读流和可写流)传入数据和从中获取数据,与前面介绍的方法相同:
// 消费者
(async function () {
while (true) {
const { done, value } = await readableStreamDefaultReader.read();
if (done) {
break;
} else {
console.log(value);
}
}
})();
// 生产者
(async function () {
for await (let chunk of ints()) {
await writableStreamDefaultWriter.ready;
writableStreamDefaultWriter.write(chunk);
}
writableStreamDefaultWriter.close();
})();
通过管道连接流
流可以通过管道连接成一串。最常见的用例是使用 pipeThrough()
方法把 ReadableStream 接入 TransformStream。从内部看,ReadableStream 先把自己的值传给 TransformStream 内部的 WritableStream,然后执行转换,接着转换后的值又在新的 ReadableStream 上出现。
下面的例子将一个整数的 ReadableStream 传入 TransformStream,TransformStream 对每个值做加倍处理:
async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}
const integerStream = new ReadableStream({
async start(controller) {
for await (let chunk of ints()) {
controller.enqueue(chunk);
}
controller.close();
},
});
const doublingStream = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2);
},
});
// 通过管道连接流
const pipedStream = integerStream.pipeThrough(doublingStream);
// 从连接流的输出获得读取器
const pipedStreamDefaultReader = pipedStream.getReader();
// 消费者
(async function () {
while (true) {
const { done, value } = await pipedStreamDefaultReader.read();
if (done) {
break;
} else {
console.log(value);
}
}
})();
// 0
// 2
// 4
// 6
// 8
另外,使用 pipeTo()
方法也可以将 ReadableStream 连接到 WritableStream。整个过程与使用 pipeThrough()
类似:
const writableStream = new WritableStream({
write(value) {
console.log(value);
},
});
const pipedStream = integerStream.pipeTo(writableStream);
// 0
// 1
// 2
// 3
// 4
注意,这里的管道连接操作隐式从 ReadableStream 获得了一个读取器,并把产生的值填充到 WritableStream。