跳到主要内容

Streams API

Streams API 是为了解决一个简单但又基础的问题而生的:Web 应用如何消费有序的小信息块而不是大块信息?这种能力主要有两种应用场景。

大块数据可能不会一次性都可用。网络请求的响应就是一个典型的例子。网络负载是以连续信息包形式交付的,而流式处理可以让应用在数据一到达就能使用,而不必等到所有数据都加载完毕。

大块数据可能需要分小部分处理。视频处理、数据压缩、图像编码和 JSON 解析都是可以分成小部分进行处理,而不必等到所有数据都在内存中时再处理的例子。

理解流

提到流,可以把数据想像成某种通过管道输送的液体。JavaScript 中的流借用了管道相关的概念,因为原理是相通的。根据规范,“这些 API 实际是为映射低级 I/O 原语而设计,包括适当时候对字节流的规范化”。Stream API 直接解决的问题是处理网络请求和读写磁盘。

Stream API 定义了三种流。

  • 可读流:可以通过某个公共接口读取数据块的流。数据在内部从底层源进入流,然后由消费者(consumer)进行处理。

  • 可写流: 可以通过某个公共接口写入数据块的流。生产者(producer)将数据写入流,数据在内部传入底层数据槽(sink)。

  • 转换流: 由两种流组成,可写流用于接收数据(可写端),可读流用于输出数据(可读端)。这两个流之间是转换程序(transformer),可以根据需要检查和修改流内容。

块、内部队列和反压

流的基本单位是块(chunk)。块可是任意数据类型,但通常是定型数组。每个块都是离散的流片段,可以作为一个整体来处理。更重要的是,块不是固定大小的,也不一定按固定间隔到达。在理想的流当中,块的大小通常近似相同,到达间隔也近似相等。不过好的流实现需要考虑边界情况。

前面提到的各种类型的流都有入口和出口的概念。有时候,由于数据进出速率不同,可能会出现不匹配的情况。为此流平衡可能出现如下三种情形。

  • 流出口处理数据的速度比入口提供数据的速度快。流出口经常空闲(可能意味着流入口效率较 26 低),但只会浪费一点内存或计算资源,因此这种流的不平衡是可以接受的。
  • 流入和流出均衡。这是理想状态。
  • 流入口提供数据的速度比出口处理数据的速度快。这种流不平衡是固有的问题。此时一定会在某个地方出现数据积压,流必须相应做出处理。

流不平衡是常见问题,但流也提供了解决这个问题的工具。所有流都会为已进入流但尚未离开流的块提供一个内部队列。对于均衡流,这个内部队列中会有零个或少量排队的块,因为流出口块出列的速度与流入口块入列的速度近似相等。这种流的内部队列所占用的内存相对比较小。

如果块入列速度快于出列速度,则内部队列会不断增大。流不能允许其内部队列无限增大,因此它会使用反压(backpressure)通知流入口停止发送数据,直到队列大小降到某个既定的阈值之下。这个阈值由排列策略决定,这个策略定义了内部队列可以占用的最大内存,即高水位线(high water mark)。

可读流

可读流是对底层数据源的封装。底层数据源可以将数据填充到流中,允许消费者通过流的公共接口读取数据。

ReadableStream

ReadableStream() 构造函数创建并从给定的处理程序返回一个可读的流对象。

语法
/**
* underlyingSource 可选
* queuingStrategy 可选
*/
new ReadableStream();
new ReadableStream(underlyingSource);
new ReadableStream(underlyingSource, queuingStrategy);

underlyingSource 一个包含定义了构造流行为方法和属性的对象。如下列表出方法和属性:

  • start(controller)

    这是一个当对象被构造时立刻调用的方法。此方法的内容由开发人员定义,并应着眼于访问流,并执行其他任何必需的设置流功能。如果这个过程是异步完成的,它可以返回一个 promise,表明成功或失败。传递给这个方法的 controller 是一个 ReadableStreamDefaultController 或 ReadableByteStreamController,具体取决于 type 属性的值。开发人员可以使用此方法在设立期间控制流。

  • pull(controller)

    这个方法,也是由开发人员定义的,当流的内部队列不满时,会重复调用这个方法,直到队列补满。如果 pull() 返回一个 promise,那么它将不会再被调用,直到 promise 完成;如果 promise 失败,该流将会出现错误。传递给此方法的 controller 参数是 ReadableStreamDefaultController 或 ReadableByteStreamController,具体取决于 type 属性的值。由于更多的块被获取,这个方法可以被开发人员用来控制流。

  • cancel(reason)

    如果应用程序表示该流将被取消(例如,调用了 ReadableStream.cancel(),则将调用此方法,该方法也由开发人员定义。该方法应该做任何必要的事情来释放对流的访问。如果这个过程是异步的,它可以返回一个 promise,表明成功或失败。原因参数包含一个 DOMString,它描述了流被取消的原因。

  • type

    该属性控制正在处理的可读类型的流。如果它包含一个设置为 bytes 的值,则传递的控制器对象将是一个 ReadableByteStreamController,能够处理 BYOB(带你自己的缓冲区)/字节流。如果未包含,则传递的控制器将为 ReadableStreamDefaultController。

  • autoAllocateChunkSize

    对于字节流,开发人员可以使用正整数值设置 autoAllocateChunkSize 以打开流的自动分配功能。启用此功能后,流实现将自动分配一个具有给定整数大小的 ArrayBuffer,并调用底层源代码,就好像消费者正在使用 BYOB reader 一样。

queuingStrategy 一个可选择定义流的队列策略的对象。这需要两个参数:

  • highWaterMark

    非负整数 - 这定义了在应用背压之前可以包含在内部队列中的块的总数。

  • size(chunk)

    包含参数 chunk 的方法——这表示每个分块使用的大小(以字节为单位)。

const stream = new ReadableStream({
start(controller) {
interval = setInterval(() => {
let string = randomChars();

// Add the string to the stream
controller.enqueue(string);

// show it on the screen
let listItem = document.createElement('li');
listItem.textContent = string;
list1.appendChild(listItem);
}, 1000);

button.addEventListener('click', function () {
clearInterval(interval);
fetchStream();
controller.close();
});
},
pull(controller) {
// We don't really need a pull in this example
},
cancel() {
// This is called if the reader cancels,
// so we should stop generating strings
clearInterval(interval);
},
});

ReadableStreamDefaultController

ReadableStreamDefaultController 是 Streams API 中的一部分,用于控制 ReadableStream 的行为。ReadableStream 表示可从中读取数据的源,而 ReadableStreamDefaultController 提供了一些方法和属性,用于管理和控制这个流的生成和传输。

ReadableStreamDefaultController 用于控制一个 ReadableStream 实例的内部行为。它主要用于自定义流的实现,例如当你使用 new ReadableStream 构造一个新的流时,可以使用它来管理流的生产和消耗。

主要方法和属性

  • enqueue(chunk)

    将一个新的数据块加入到流的队列中。如果消费者(读取流的代码)还没有消耗数据,则会被添加到队列中等待消费。

  • close()

    关闭流,表示不再有更多的数据块。一旦调用了 close() 方法,流将进入结束状态,消费者不会再收到新的数据块。

  • error(e)

    将流置于错误状态,并将错误 e 传递给消费者。一旦调用了 error(e) 方法,流将无法再正常操作。

来看下面的生成器,它每 1000 毫秒就会生成一个递增的整数:

async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}

这个生成器的值可以通过可读流的控制器传入可读流。访问这个控制器最简单的方式就是创建 ReadableStream 的一个实例,并在这个构造函数的 underlyingSource 参数(第一个参数)中定义 start()方法,然后在这个方法中使用作为参数传入的 controller。默认情况下,这个控制器参数是 ReadableStreamDefaultController 的一个实例:

const readableStream = new ReadableStream({
start(controller) {
console.log(controller); // ReadableStreamDefaultController {}
},
});

调用控制器的 enqueue() 方法可以把值传入控制器。所有值都传完之后,调用 close() 关闭流:

async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}

const readableStream = new ReadableStream({
async start(controller) {
for await (let chunk of ints()) {
controller.enqueue(chunk);
}
controller.close();
},
});

ReadableStreamDefaultReader

ReadableStreamDefaultReader 允许你从 ReadableStream 中读取数据。它是通过调用 ReadableStream 实例的 getReader 方法来创建的。

主要方法和属性

  • read()

    异步方法,从流中读取下一个数据块。返回一个 Promise,该 Promise 解析为一个对象,包含 value(数据块)和 done(布尔值,表示流是否已结束)。

  • releaseLock()

    释放读取器对流的锁定,使其他读取器可以锁定该流。释放锁定后,不能再通过该读取器读取数据。

  • closed()

    一个 Promise,当流关闭或发生错误时,该 Promise 解析或拒绝。

前面的例子把 5 个值加入了流的队列,但没有把它们从队列中读出来。为此,需要一个 ReadableStreamDefaultReader 的实例,该实例可以通过流的 getReader() 方法获取。调用这个方法会获得流的锁,保证只有这个读取器可以从流中读取值:

async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}

const readableStream = new ReadableStream({
async start(controller) {
for await (let chunk of ints()) {
controller.enqueue(chunk);
}
controller.close();
},
});

console.log(readableStream.locked); // false
const readableStreamDefaultReader = readableStream.getReader();
console.log(readableStream.locked); // true

消费者使用这个读取器实例的 read() 方法可以读出值:

(async function () {
while (true) {
const { done, value } = await readableStreamDefaultReader.read();
if (done) {
break;
} else {
console.log(value);
}
}
})();

// 0
// 1
// 2
// 3
// 4

可写流

可写流是底层数据槽的封装。底层数据槽处理通过流的公共接口写入的数据。

WritableStream

WritableStream() 构造函数创建一个新的 WritableStream 对象实例。

new WritableStream();
new WritableStream(underlyingSink);
new WritableStream(underlyingSink, queuingStrategy);

underlyingSink 一个包含方法和属性的对象,这些方法和属性定义了构造的流的实例的具体行为。underlyingSource 可以包括:

  • start(controller)
  • write(chunk, controller)
  • close(controller)
  • abort(reason)

queuingStrategy 一个可选的定义流的队列策略的对象。这需要两个参数:

  • highWaterMark
  • size(chunk)

来看下面的生成器,它每 1000 毫秒就会生成一个递增的整数:

async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}

这些值通过可写流的公共接口可以写入流。在传给 WritableStream 构造函数的 underlyingSink 参数中,通过实现 write() 方法可以获得写入的数据:

const readableStream = new ReadableStream({
write(value) {
console.log(value);
},
});

WritableStreamDefaultWriter

要把获得的数据写入流,可以通过流的 getWriter() 方法获取 WritableStreamDefaultWriter 的实例。这样会获得流的锁,确保只有一个写入器可以向流中写入数据:

async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}

const writableStream = new WritableStream({
write(value) {
console.log(value);
},
});

console.log(writableStream.locked); // false
const writableStreamDefaultWriter = writableStream.getWriter();
console.log(writableStream.locked); // true

在向流中写入数据前,生产者必须确保写入器可以接收值。writableStreamDefaultWriter.ready 返回一个期约,此期约会在能够向流中写入数据时解决。然后,就可以把值传给 writableStreamDefaultWriter.write() 方法。写入数据之后,调用 writableStreamDefaultWriter.close() 将流关闭:

// 生产者
(async function () {
for await (let chunk of ints()) {
await writableStreamDefaultWriter.ready;
writableStreamDefaultWriter.write(chunk);
}
writableStreamDefaultWriter.close();
})();

转换流

转换流用于组合可读流和可写流。数据块在两个流之间的转换是通过 transform() 方法完成的。

TransformStream

TransformStream() 构造函数创建一个新的 TransformStream 对象,该对象表示一对流:一个 WritableStream 表示可写端,和一个 ReadableStream 表示可读端。

语法
new TransformStream();
new TransformStream(transformer);
new TransformStream(transformer, writableStrategy);
new TransformStream(transformer, writableStrategy, readableStrategy);

transformer 一个表示 transformer 的对象。如果未提供,则生成的流将是一个恒等变换流,它将所有写入可写端的分块转发到可读端,不会有任何改变。

transformer 对象可以包含以下任何方法。每个方法的 controller 都是一个 TransformStreamDefaultController 实例。

  • start(controller)

    当 TransformStream 被构造时调用。它通常用于使用 TransformStreamDefaultController.enqueue() 对分块进行排队。

  • transform(chunk, controller)

    当一个写入可写端的分块准备好转换时调用,并且执行转换流的工作。如果没有提供 transform() 方法,则使用恒等变换,并且分块将在没有更改的情况下排队。

  • flush(controller)

    当所有写入可写端的分块成功转换后被调用,并且可写端将会关闭。

writableStrategyreadableStrategy 对象一样,定义了队列策略的可选对象。它需要两个参数:

  • highWaterMark

    一个非负整数。它定义了在应用背压之前内部队列包含的分块的总数。

  • size(chunk)

    一个包含参数 chunk 的方法。它表示用于每一个块的大小,以字节为单位。

创建了一个 TransformStream 的实例,通过 transform() 方法将每个值翻倍:
async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}

const { writable, readable } = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2);
},
});

向转换流的组件流(可读流和可写流)传入数据和从中获取数据,与前面介绍的方法相同:

// 消费者
(async function () {
while (true) {
const { done, value } = await readableStreamDefaultReader.read();
if (done) {
break;
} else {
console.log(value);
}
}
})();

// 生产者
(async function () {
for await (let chunk of ints()) {
await writableStreamDefaultWriter.ready;
writableStreamDefaultWriter.write(chunk);
}
writableStreamDefaultWriter.close();
})();

通过管道连接流

流可以通过管道连接成一串。最常见的用例是使用 pipeThrough() 方法把 ReadableStream 接入 TransformStream。从内部看,ReadableStream 先把自己的值传给 TransformStream 内部的 WritableStream,然后执行转换,接着转换后的值又在新的 ReadableStream 上出现。

下面的例子将一个整数的 ReadableStream 传入 TransformStream,TransformStream 对每个值做加倍处理:

async function* ints() {
// 每 1000 毫秒生成一个递增的整数
for (let i = 0; i < 5; ++i) {
yield await new Promise(resolve => setTimeout(resolve, 1000, i));
}
}

const integerStream = new ReadableStream({
async start(controller) {
for await (let chunk of ints()) {
controller.enqueue(chunk);
}
controller.close();
},
});

const doublingStream = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2);
},
});

// 通过管道连接流
const pipedStream = integerStream.pipeThrough(doublingStream);
// 从连接流的输出获得读取器
const pipedStreamDefaultReader = pipedStream.getReader();

// 消费者
(async function () {
while (true) {
const { done, value } = await pipedStreamDefaultReader.read();
if (done) {
break;
} else {
console.log(value);
}
}
})();

// 0
// 2
// 4
// 6
// 8

另外,使用 pipeTo() 方法也可以将 ReadableStream 连接到 WritableStream。整个过程与使用 pipeThrough() 类似:

const writableStream = new WritableStream({
write(value) {
console.log(value);
},
});

const pipedStream = integerStream.pipeTo(writableStream);

// 0
// 1
// 2
// 3
// 4

注意,这里的管道连接操作隐式从 ReadableStream 获得了一个读取器,并把产生的值填充到 WritableStream。