你可以利用WHATWG标准流(自Node.js v17版本起成为其一部分),在发出前读取并缓存字节。采用这种模式还能增强代码的可移植性,使其能更容易地应用于其他环境(比如浏览器、Deno、Bun等)。
一个TransformStream
(Node.js、 MDN)能够根据指定的字节长度来缓存数据,直到达到该长度后再发出,示例如下:
buffered_byte_stream.mjs
import { TransformStream } from "node:stream/web";
export class BufferedByteStream extends TransformStream {
#buf = [];
constructor(byteLength) {
super({
flush: (controller) => {
if (this.#buf.length >= byteLength) {
controller.enqueue(Uint8Array.from(this.#buf.splice(0, byteLength)));
}
if (this.#buf.length > 0) {
controller.enqueue(Uint8Array.from(this.#buf.splice(0, this.#buf.length)));
}
},
transform: (chunk, controller) => {
this.#buf.push(...chunk);
if (this.#buf.length >= byteLength) {
controller.enqueue(Uint8Array.from(this.#buf.splice(0, byteLength)));
}
},
});
}
}
然后在你的主应用代码中使用它,大概会是这样的:
main.mjs
import * as fs from "node:fs/promises";
import * as path from "node:path";
import { WritableStream } from "node:stream/web";
import { BufferedByteStream } from "./buffered_byte_stream.mjs";
function createWritableDestination(filePath) {
let index = 0;
return new WritableStream({
async write(u8Arr) {
const { base, dir } = path.parse(filePath);
const fileName = `${base}.part${(index++).toString().padStart(3, "0")}`;
console.log(`下载完成: ${fileName}`);
await fs.writeFile(path.join(dir, fileName), u8Arr);
},
});
}
const inputDir = "../in";
const outputDir = "../out";
const byteLengthLimit = 25e6; // 25MB
for (const entry of await fs.readdir(inputDir, { withFileTypes: true })) {
if (!entry.isFile()) continue; // 忽略非文件
const fileHandle = await fs.open(path.join(inputDir, entry.name));
try {
await fileHandle
.readableWebStream({ type: "bytes" }) // 可读流<Uint8Array>
.pipeThrough(new BufferedByteStream(byteLengthLimit))
.pipeTo(createWritableDestination(path.join(outputDir, entry.name)));
} finally {
await fileHandle.close();
}
}
Code in TypeScript Playground