基于Service Worker实现WebRTC局域网大文件传输能力
Service Worker
是一种驻留在用户浏览器后台的脚本,能够拦截和处理网络请求,从而实现丰富的离线体验、缓存管理和网络效率优化。请求拦截是其关键功能之一,通过监听fetch
事件,Service Worker
可以捕获所有向网络发出的请求,并有选择地处理这些请求,例如从缓存中读取响应,或者对请求进行修改和重定向,进而实现可靠的离线浏览和更快速的内容加载。
描述
在前段时间,在群里看到有人提了一个问题,在从对象存储下载文件的时候,为什么实现了携带了一个GitHub Pages
的地址,理论上而言我们从对象存储下载内容直接点连接就好了,然而这里竟然看起来似乎还有一个中间环节,像是需要被GitHub Pages
拦截并中转才下载到本地,链接地址类似于下面的内容。此时如果我们在下载页面点击下载后,再打开浏览器的下载管理功能,可以发现下载地址实际上会变成一个更加奇怪的地址,而这个地址我们实际上直接在浏览器打开会响应404
。
<!-- 下载页面 --> https://jimmywarting.github.io/StreamSaver.js/examples/saving-a-blob.html <!-- 浏览器下载管理 --> https://jimmywarting.github.io/StreamSaver.js/jimmywarting.github.io/712864/sample.txt
从链接中我们可以明显地看出这里是使用了StreamSaver.js
来作为下载文件的中间环节,从README
中我们可以看出StreamSaver.js
是基于ServiceWorker
的大文件下载方案。于是前段时间有时间将其实现研究了一番,通常我们需要调度文件下载时,可能会直接通过<a />
标签在浏览器中直接打开目标链接便可以开始下载,然而这种方式有比较明显的三个问题:
- 如果直接打开的资源是图片、视频等浏览器能够直接解析的资源,那么此时浏览器不会触发下载行为,而是会直接在浏览器中预览打开的资源,即默认的
Content-Disposition
值是inline
,不会触发值为attachment
的下载行为。当然,使用<a />
标签的download
可以解决这个问题,然而这个属性只有在同源URL
、blob:
和data:
协议下才会生效。 - 如果我们上传到对象存储的文件存在重名资源的问题,那么为了防止文件被覆盖,我们可能会随机生成资源名或者在资源后面加上时间戳,甚至直接将文件名生成不带扩展名的
HASH
值。那么在文件下载的时候,我们就需要将文件名实际还原回来,然而这个过程仍然需要依赖响应的attachment; filename=
,或者<a />
标签的download
属性来重命名文件。 - 如果我们请求的资源是需要校验权限才能正常下载,那么直接使用
<a />
标签进行资源请求的时候则仅仅是发起了GET
请求,而且将密钥放置于请求的链接地址上显然是做不到真正的权限校验的。当然通过签发临时的Token
并返回GET
请求地址当然是可行的,但如果涉及到更复杂一些的权限控制以及审计追踪时,生成临时下载链接可能并不足以做到高安全性的要求,类似的问题在EventSource
对象实现的SSE
中更加明显。
而在我们的项目中,恰好存在这样的历史遗留问题,我们的资源文件都会存储在OSS-Object Storage Service
对象存储中,并且为了防止资源重名的问题,默认的资源策略是完全不携带文件的扩展名,而是直接将文件名生成HASH
值,而且由于域名是基建自带的CDN
加速域名,不能通过配置CNAME
来定义为我们站点的域名,也就是说我们的资源必然存在跨域的问题,这就相当于把所有的限制都触及到了。
那么在这种情况下,我们是需要将文件重命名为原本的资源名称的,毕竟在不存在扩展名的情况下操作系统不能识别出文件内容,而我们的CDN
资源是不存在Content-Disposition
响应头以及原始资源名称的,而且文件也不是同域名下的资源。在这种情况下我们需要实现跨域情况下的资源重命名,由此来支持用户的下载行为,所以我们在这里采取的方案是首先使用fetch
将文件下载到内存,然后通过createObjectURL
将其创建为blob:
协议的资源,由此来支持<a />
标签的download
属性。
通过这种方式下载文件则又出现了另一个问题,将文件全部下载后都存在内存中可能会存在OOM
的现象,对于现代浏览器来说并没有非常明确的单个Tab
页的内存限制,而是根据系统资源动态分配的,但是只要在内存中下载足够大的文件,还是会触发OOM
导致浏览器页面崩溃。那么在这种情况下,通过将Service Worker
作为中间人拦截下载请求,并且在响应的Header
中加入Content-Disposition
来支持文件重命名,并且可以通过Stream API
来实现流式的下载行为,由此避免全部将文件下载到内存当中。总结来说,在这里我们通过这种方式解决了两个问题:
- 跨域资源的下载,通过劫持请求并增加相应头的方式,解决了跨域资源的重命名问题,并以此来直接调度浏览器
IO
来实现下载。 - 避免内存溢出问题,通过
Stream API
将fetch
请求的数据分片写入文件,以此来做到流式下载,避免将文件全部写入到内存中。
那么除了在对象存储下载文件之外,这种数据处理方式还有很多应用场景,例如我们需要批量下载文件并且压缩时,可以主动fetch
后通过ReadableStream
读,并且pipe
到类似压缩的实现中,例如zlib.createDeflateRaw
的浏览器方案,再pipe
到WritableStream
中类似FileSystemFileHandle.createWritable
以此来实时写入文件,这样就可以做到高效的文件读写,而不需要将其全部持有在内存中。
恰好在先前我们基于WebRTC实现了局域网文件传输,而通过WebRTC
传输的文件也会同样需要面对大文件传输的问题,并且由于其本身并不是HTTP
协议,自然就不可能携带Content-Disposition
等响应头。这样我们的大文件传输就必须要借助中间人的方式进行拦截,此时我们通过模拟HTTP
请求的方式来生成虚拟的下载链接,并且由于本身就是分片传输,我们可以很轻松地借助Stream API
来实现流式下载能力。那么本文就以WebRTC
的文件传输为基础,来实现基于Service Worker
的大文件传输方案,文中的相关实现都在https://github.com/WindrunnerMax/FileTransfer
中。
Stream API
浏览器实现的Stream API
中存在ReadableStream
、WritableStream
、TransformStream
三种流类型,其中ReadableStream
用以表示可读的流,WritableStream
用以表示可写的流,而TransformStream
用以表示可读写的流。由于在浏览器中Stream
的实现时间与机制并不相同,ReadableStream
的兼容性与Fetch API
基本一致,而WritableStream
和TransformStream
的兼容性则相对稍差一点。
数据流动
在最开始接触Stream API
的时候,我难以理解整个管道的数据流,针对于缓冲区以及背压等问题本身是不难理解的,但是在实际将Stream
应用的时候,我发现并不能理解整个流的模型的数据流动方向。在我的理解中,整个管道应该是以WritableStream
起始用以写入/生产数据,而后继的管道则应该使用ReadableStream
来读取/消费数据,而整个连接过程则可以通过pipeTo
链接起来。
const writable = new WritableStream(); const readable = new ReadableStream(); writable.pipeTo(readable); // TypeError: writable.pipeTo is not a function const writer = writable.getWriter(); const reader = readable.getReader(); // ... writer.write("xxx"); reader.read().then(({ value, done }) => { console.log(value, done); });
当然这是个错误的示例,针对于流的理解我们应该参考Node.js
的Stream
模块,以node:fs
的createReadStream
与createWriteStream
为例,我们会更容易理解整个模型。我们的Stream
模型是以ReadableStream
为起始,即数据生产是以Node.js
本身的IO
为基础的读取文件,将内容写入到ReadableStream
中,而我们作为数据处理者,则是在其本身的事件中进行数据处理,进而将处理后的数据写入WritableStream
来消费,即后继的管道是以WritableStream
为终点。
const fs = require("node:fs"); const path = require("node:path"); const sourceFilePath = path.resolve("./source.txt"); const destFilePath = path.join("./destination.txt"); const readStream = fs.createReadStream(sourceFilePath, { encoding: "UTF-8" }); const writeStream = fs.createWriteStream(destFilePath, { encoding: "UTF-8" }); readStream.on("data", chunk => { writeStream.write(chunk); }); readStream.on("end", () => { writeStream.end(); });
那么在浏览器中,我们的Stream API
同样是以ReadableStream
为起始,Fetch API
的Response.body
就是很好的示例,数据的起始同样是以IO
为基础的网络请求。在浏览器中我们的ReadableStream
的API
与Node.js
本身还是有些不同的,例如在浏览器ReadableStream
的Reader
并不存在类似on("data", () => null)
的事件监听,而前边的例子只是为了让我们更好地理解整个流模型,在这里我们当然是以浏览器的API
为主。
聊了这么多关于Stream API
的问题,我们回到针对于WebRTC
传递的数据实现,针对于类似Fetch
的数据传输,是借助浏览器本身的IO
来控制ReadableStream
的数据生产,而我们的WebRTC
仅仅是传输通道,因此在管道的初始数据生产时,ReadableStream
是需要我们自己来控制的,因此我们最开始想到的Writable -> Readable
方式,则是为了适应这部分实现。而实际上这种方式实际上更契合于TransformStream
的模型,其本身的能力是对数据流进行转换,而我们同样可以借助TransformStream
来实现流的读写。
const transformStream = new TransformStream<number, number>({ transform(chunk, controller) { controller.enqueue(chunk + 1); }, }); const writer = transformStream.writable.getWriter(); const reader = transformStream.readable.getReader(); const process = (res: { value?: number; done: boolean }) => { const { value, done } = res; console.log(value, done); if (done) return; reader.read().then(process); }; reader.read().then(process); writer.write(1); writer.write(2); writer.close();
那么在这里我们就可以实现对于ReadableStream
的数据处理,在基于WebRTC
的数据传输实现中,我们可以获取到DataChannel
的数据流本身,那么此时我们就可以通过ReadableStream
的Controller
来向缓冲队列中置入数据,以此来实现数据的写入,而后续的数据消费则可以使用ReadableStream
的Reader
来实现,这样我们就可以借助缓冲队列实现流式的数据传输。
const readable = new ReadableStream<number>({ start(controller) { controller.enqueue(1); controller.enqueue(2); controller.close(); }, }); const reader = readable.getReader(); const process = (res: { value?: number; done: boolean }) => { const { value, done } = res; console.log(value, done); if (done) return; reader.read().then(process); }; reader.read().then(process);
背压问题
那么在这里我们可以思考一个问题,如果此时我们的DataChannel
的数据流的传输速度非常快,也就是不断地将数据enqueue
到队列当中,而假如此时我们的消费速度非常慢,例如我们的硬盘写入速度比较慢,那么数据的队列就会不断增长,那么就可能导致内存溢出。实际上这个问题有专业的术语来描述,即Back Pressure
背压问题,在ReadableStream
中我们可以通过controller.desiredSize
来获取当前队列的大小,以此来控制数据的生产速度,以此来避免数据的积压。
const readable = new ReadableStream<number>({ start(controller) { console.log(controller.desiredSize); // 1 controller.enqueue(1); console.log(controller.desiredSize); // 0 controller.enqueue(2); console.log(controller.desiredSize); // -1 controller.close(); } });
而对于背压问题, 我们可以很简单地理解到,当我们的数据生产速度大于数据消费速度时,就会导致数据的积压,那么针对于ReadableStream
与WritableStream
,我们可以分别得到相关的排队策略,实际上我们也能够很容易理解到背压所谓的压力都是来自于缓冲队列中未消费的块,当然我们也可以预设比较大的缓冲队列长度,只不过这样虽然避免了desiredSize
为负值,但是并不能解决背压问题。
- 对于
ReadableStream
,背压来自于已入队但尚未读取的块。 - 对于
WritableStream
,背压来自于已写入但尚未由底层接收器处理的块。
而在先前的ReadableStream
实现中,我们可以很明显地看到其本身并没有携带背压的默认处理机制,即使我们可以通过desiredSize
来判断当前内置队列的压力,但是我们并不能很明确地反馈数据的生产速度,我们更希望基于事件驱动来控制而不是类似于setTimeout
来轮训检查,当然我们也可以通过pull
方法来被动控制队列的数据量。而在WritableStream
中则存在内置的背压处理方法即writer.ready
,通过这个方法我们可以判断当前队列的压力,以此来控制数据的生产速度。
(async () => { const writable = new WritableStream(); const writer = writable.getWriter(); await writer.write(1); await writer.write(1); await writer.write(1); console.log("written"); // written await writer.ready; await writer.write(1); console.log("written"); // Nil })();
因此在我们的WebRTC
数据传输中,为了方便地处理背压问题,我们是通过TransformStream
的writable
端来实现数据的写入,而消费则是通过readable
端来实现的,这样我们就可以很好地控制数据的生产速度,并且可以在主线程中将TransformStream
定义后,将readable
端通过postMessage
将其作为Transferable Object
传递到Service Worker
中消费即可。
// packages/webrtc/client/worker/event.ts export class WorkerEvent { public static start(fileId: string, fileName: string, fileSize: number, fileTotal: number) { const ts = new TransformStream(); WorkerEvent.channel.port1.postMessage( { key: MESSAGE_TYPE.TRANSFER_START, id: fileId, readable: ts.readable, } as MessageType, [ts.readable] ); } public static async post(fileId: string, data: ArrayBuffer) { const writer = WorkerEvent.writer.get(fileId); if (!writer) return void 0; await writer.ready; return writer.write(new Uint8Array(data)); } public static close(fileId: string) { WorkerEvent.channel?.port1.postMessage({ key: MESSAGE_TYPE.TRANSFER_CLOSE, id: fileId, } as MessageType); const writer = WorkerEvent.writer.get(fileId); writer?.close(); } }
Fetch
在Fetch API
的Response
对象中,存在Response.body
属性用以获取响应的ReadableStream
,与上述对象一致同样用以表示可读的流。通过这个接口我们可以实现流式的读取数据,而不需要一次性将所有数据读取到内存中,以此来渐进式地处理数据,例如在使用fetch
实现SSE - Server-Sent Events
的响应时,便可以通过维持长链接配合ReadableStream
来实现数据的响应。
针对于Fetch
方法,在接触Stream API
之前我们可能主要的处理方式是调用res.json()
等方法来读取数据,实际上这些方法同样会在其内部实现中隐式调用ReadableStream.getReader()
来读取数据。而在Stream API
出现之前,如果我们想要处理某种资源例如视频、文本文件等,我们必须下载整个文件,等待它反序列化为合适的格式,然后直接处理所有数据。
因此在先前调研StreamSaver.js
时,我比较费解的一个问题就是,既然我们请求的数据依然是需要从全部下载到内存中,那么在这种情况下我们使用StreamSaver.js
依然无法做到流式地将数据写入硬盘,依然会存在浏览器Tab
页的内存溢出问题。而在了解到Fetch API
的Response.body
属性后,关于整个流的处理方式就变得清晰了,我们可以不断地调用read()
方法将数据传递到Service Worker
调度下载即可。
因此调度文件下载的方式大概与上述的WebRTC
传输方式类似,在我们已经完成劫持数据请求的中间人Service Worker
之后,我们只需要在主线程部分发起fetch
请求,然后在响应数据时通过Iframe
发起劫持的下载请求,然后通过Response.body.getReader()
分片读取数据,并且不断将其写入到TransformStream
的Writer
中即可,此外我们还可以实现一些诸如下载进度之类的效果。
const fileId = "xxxxxx"; const worker = await navigator.serviceWorker.getRegistration("./"); const channel = new MessageChannel(); worker.active.postMessage({ type: "INIT_CHANNEL" }, [channel.port2]); const ts = new TransformStream(); channel.port1.postMessage( { key: "TRANSFER_START", id: fileId, readable: ts.readable, }, [ts.readable] ); const src = `/${fileId}` + `?X-File-Id=${fileId}` + `&X-File-Size=42373` + `&X-File-Total=1` + `&X-File-Name=favicon.ico`; const iframe = document.createElement("iframe"); iframe.hidden = true; iframe.src = src; iframe.id = fileId; document.body.appendChild(iframe); const writer = ts.writable.getWriter(); fetch("./favicon.ico").then(res => { const reader = res.body.getReader(); const process = (res) => { const { value, done } = res; if (done) { writer.close(); return; } writer.write(value); reader.read().then(process); }; reader.read().then(process); });
Service Worker
Service Worker
作为一种运行在后台的独立线程,具备充当网络请求中间人的能力,能够拦截、修改甚至完全替换网络请求和响应,从而实现高级功能如缓存管理、提升性能、离线访问、以及对请求进行细粒度的控制和优化。在这里我们就可以借助Service Worker
为我们的请求响应加入Content-Disposition
等响应头,以此来触发浏览器的下载能力,借助浏览器的IO
实现大文件的下载。
环境搭建
在通过Service Worker
实现中间人拦截网络请求之前,我们可以先看一下在Service Worker
中搭建TS
环境以及Webpack
的配置。我们平时TS
开发的环境的lib
主要是dom
、dom.iterable
、esnext
,而由于Worker
中的全局变量以及持有的方法并不相同,因此其本身的lib
环境需要改为WebWorker
、ESNext
,且如果不主动引入或者导出模块,TS
会认为其是作为d.ts
使用,因此即使我们在没有默认导入导出的情况下也要默认导出个空对象,而在有导入的情况下则需要注意将其在tsconfig
中include
相关模块。
// packages/webrtc/client/worker/index.ts /// <reference lib="esnext" /> /// <reference lib="webworker" /> declare let self: ServiceWorkerGlobalScope; export {};
Service Worker
本身作为独立的Js
文件,其必须要在同源策略下运行,这里如果需要关注部署环境的路由环境的话,需要将其配置为独立的路由加载路径。而对于我们的静态资源本身来说则需要将我们实现的独立Worker
作为入口文件配置到打包工具中,并且为了方便处理SW
是否注册以及缓存更新,通常我们都是将其固定为确定的文件名,以此来保证其在缓存中的唯一性。
// packages/webrtc/rspack.client.js /** * @type {import("@rspack/cli").Configuration} */ const Worker = { context: __dirname, entry: { worker: "./client/worker/index.ts", }, devtool: isDev ? "source-map" : false, output: { clean: true, filename: "[name].js", path: path.resolve(__dirname, "build/static"), }, }; module.exports = [/** ... */, Worker];
在Service Worker
中,我们可以在其install
事件和activate
事件中分别处理其安装与激活的逻辑,通常新的Service Worker
安装完成后会进入等待阶段,直到旧的Service Worker
被完全卸载后再进行激活,因此我们可以直接在onInstall
时skipWaiting
,在onActive
事件中,我们可以通过clients.claim
在激活后立即接管所有的客户端页面,无需等待页面刷新,这对于我们调试SW
的时候非常有用。
// packages/webrtc/client/worker/index.ts self.addEventListener("install", () => { self.skipWaiting(); console.log("Service Worker Installed"); }); self.addEventListener("activate", event => { event.waitUntil(self.clients.claim()); console.log("Service Worker Activate"); });
请求拦截
接下来我们就要来研究一下Service Worker
的拦截网络请求能力了,在MDN
中存在对于Fetch Event
的详细描述,而且Fetch Event
是仅能够在Service Worker
中使用的。而在这里我们的拦截请求与响应则非常简单,我们只需要从请求的地址中获取相关信息,即id
、name
、size
、total
,然后通过ReadableStream
构造Response
作为响应即可,这里主要需要关注的是Content-Disposition
与Content-Length
两个响应头,这是我们触发下载的关键配置。
// packages/webrtc/client/worker/index.ts self.onfetch = event => { const url = new URL(event.request.url); const search = url.searchParams; const fileId = search.get(HEADER_KEY.FILE_ID); const fileName = search.get(HEADER_KEY.FILE_NAME); const fileSize = search.get(HEADER_KEY.FILE_SIZE); const fileTotal = search.get(HEADER_KEY.FILE_TOTAL); if (!fileId || !fileName || !fileSize || !fileTotal) { return void 0; } const transfer = map.get(fileId); if (!transfer) { return event.respondWith(new Response(null, { status: 404 })); } const [readable] = transfer; const newFileName = encodeURIComponent(fileName).replace(/['()]/g, escape).replace(/*/g, "%2A"); const responseHeader = new Headers({ [HEADER_KEY.FILE_ID]: fileId, [HEADER_KEY.FILE_SIZE]: fileSize, [HEADER_KEY.FILE_NAME]: newFileName, "Content-Type": "application/octet-stream; charset=utf-8", "Content-Security-Policy": "default-src 'none'", "X-Content-Security-Policy": "default-src 'none'", "X-WebKit-CSP": "default-src 'none'", "X-XSS-Protection": "1; mode=block", "Cross-Origin-Embedder-Policy": "require-corp", "Content-Disposition": "attachment; filename*=UTF-8''" + newFileName, "Content-Length": fileSize, }); const response = new Response(readable, { headers: responseHeader, }); return event.respondWith(response); }
在这里还有一件有趣的事情,在上面的实现中我们可以看到对于从请求地址中取得相关信息的检查,如果检查不通过则返回undefined
。这实际上是个很常见的拦截Case
,即不符合条件的请求我们直接放行即可,而在之前我一直比较纳闷的问题是,任何经过Service Worker
拦截的请求都会在我们的Network
控制台面板中出现带着齿轮符号的请求,也就是从Service Worker
中发起的请求,这样在调试的时候会显得非常混乱。
实际上这就单纯是我们使用出现了问题,从提示信息能够明显地看出来这是从Service Worker
中发起的请求,而实际上这个请求我们直接让其通过原本的链路请求即可,不需要从Service Worker
中实际代理,而触发这个请求条目的主要原因是我们调用了fetch
方法,而无论是直接返回fetch
还是通过event.respondWith(fetch)
都会触发这个请求条目,因此我们在拦截请求的时候,如果不符合条件则直接返回undefined
即可。
// 会再次发起请求 return fetch(event.request); return event.respondWith(fetch(event.request)); // 不会再次发起请求 return ;
那么我们需要接着思考一个问题,应该如何触发下载,这里的Service Worker
仅仅是拦截了请求,而在WebRTC
的传输中并不会实际发起任何HTTP
请求,因此我们需要主动触发这个请求,得益于Service Worker
可以拦截几乎所有的请求,包括静态资源、网络请求等,因此我们可以直接借助创建Iframe
的方式配合约定好的字段名来实现下载,在这里实际上就是我们最开始提到的那个比较奇怪的链接地址了。
// packages/webrtc/client/worker/event.ts const src = `/${fileId}` + `?${HEADER_KEY.FILE_ID}=${fileId}` + `&${HEADER_KEY.FILE_SIZE}=${fileSize}` + `&${HEADER_KEY.FILE_TOTAL}=${fileTotal}` + `&${HEADER_KEY.FILE_NAME}=${fileName}`; const iframe = document.createElement("iframe"); iframe.hidden = true; iframe.src = src; iframe.id = fileId; document.body.appendChild(iframe);
在这里我们可能会好奇一个问题,为什么我们的请求信息是从URL
上获取,而不是直接在原始请求的时候就构造完成相关的Header
信息,在Service Worker
中直接将约定的响应头直接转发即可,也就是说为什么要用Iframe
而不是fetch
请求并且携带请求头的方式来实现下载。实际上这是因为即使存在了"Content-Disposition": "attachment; xxx"
响应头,fetch
请求也不支持直接发起下载能力。
实际上在这里我还研究了一下StreamSaver.js
的实现,这同样是个很有趣的事情,StreamSaver.js
的运行环境本身就是个Iframe
即mitm.html
,那么我们姑且将其称为B.html
,那么此时我们的主线程称其为A.html
。此时我们在B
中注册名为B.js
的Service Worker
,之后我们通过python3 -m http.server 9000
等方式作为服务资源打开A
的地址,新器端口9001
打开B
的地址,保证其存在跨域的情况。
<!-- A.html --> <iframe src="http://localhost:9001/B.html" hidden></iframe> <!-- B.html --> <script> navigator.serviceWorker.register("./B.js", { scope: "./" }); </script>
// B.js self.onfetch = (e) => { console.log(e.request.url); if (e.request.url.includes("ping")) { e.respondWith(new Response("pong")); } };
此时我们在A.html
中创建新的iframe
地址localhost:9001/ping
,也就是类似于在StreamSaver.js
创建出的临时下载地址那种,我们可以发现这个地址竟然可以被监听到,即Service Worker
可以拦截到这个请求,当时觉得这件事很神奇因为在不同域名的情况下理论上不可能被拦截的,本来以为发现了什么iframe
的特性,最后发现我们访问的是9001
的源地址,也就是相当于还是在B.html
源下的资源,如果此时我们访问的是9000
的资源则不会有这个效果了。
const iframe = document.createElement("iframe"); iframe.hidden = true; iframe.src = "http://localhost:9001/ping"; document.body.appendChild(iframe);
此外实际上如果我们在浏览器的地址栏中直接打开http://localhost:9001/ping
也是同样可以得到pong
的响应的,也就是说Service Worker
的拦截范围是在注册的scope
范围内,那么实际上如果有必要的话,我们则完全可以基于SW
来实现离线的PWA
应用,而不需要依赖于服务器响应的路由以及接口。此外,这个效果在我们的WebRTC
实现的SW
中也是存在的,而当我们再次点击下载链接无法得到响应,是由于我们检查到transfer
不存在,直接响应了404
。
const transfer = map.get(fileId); if (!transfer) { return event.respondWith(new Response(null, { status: 404 })); }
数据通信
言归正传,接下来我们就需要实现与Service Worker
的通信方案了,这里的实现就比较常规了。首先我们要注册Service Worker
,在同一个Scope
下只能注册一个Service Worker
,如果在同一个作用域内注册多个Service Worker
,那么后注册的Service Worker
会覆盖先注册的Service Worker
,当然这个问题不存在WebWorker
中。在这里我们借助getRegistration
与register
分别来获取当前活跃的Service Worker
以及注册新的Service Worker
。
// packages/webrtc/client/worker/event.ts if (!navigator.serviceWorker) { console.warn("Service Worker Not Supported"); return Promise.resolve(null); } try { const serviceWorker = await navigator.serviceWorker.getRegistration("./"); if (serviceWorker) { WorkerEvent.worker = serviceWorker; return Promise.resolve(serviceWorker); } const worker = await navigator.serviceWorker.register( process.env.PUBLIC_PATH + "worker.js?" + process.env.RANDOM_ID, { scope: "./" } ); WorkerEvent.worker = worker; return worker; } catch (error) { console.warn("Service Worker Register Error", error); return Promise.resolve(null); }
在与Service Worker
数据通信方面,我们可以借助MessageChannel
来实现。MessageChannel
是一个双向通信的通道,可以在两个不同的Context
中传递消息,例如在主线程与Worker
线程之间进行数据通信。我们只需要在主线程中创建一个MessageChannel
,然后将其port2
端口通过postMessage
传递给Service Worker
,而Service Worker
则可以通过event.ports[0]
获取到这个port2
,此后我们就可以借助这两个port
直接通信了。
或许我们会思考一个问题,为什么我们可以将port2
传递到Service Worker
中,理论上而言我们的postMessage
只能传递可序列化Structured Clone
的对象,例如字符串、数字等数据类型,而port2
本身是作为不可序列化的对象存在的。那么这里就涉及到了Transferable objects
的概念,可转移的对象是拥有属于自己的资源的对象,这些资源可以从一个上下文转移到另一个,确保资源一次仅在一个上下文中可用,在传输后原始对象不再可用,其不再指向转移后的资源,并且任何读取或者写入该对象的尝试都将抛出异常。
// packages/webrtc/client/worker/event.ts if (!WorkerEvent.channel) { WorkerEvent.channel = new MessageChannel(); WorkerEvent.channel.port1.onmessage = event => { console.log("WorkerEvent", event.data); }; WorkerEvent.worker?.active?.postMessage({ type: MESSAGE_TYPE.INIT_CHANNEL }, [ WorkerEvent.channel.port2, ]); }
因为在这里我们暂时不需要接收来自Service Worker
的消息,因此在这里我们对于port1
接收的消息只是简单地打印了出来。而在初始化CHANNEL
的时候,我们将port2
作为可转移对象放置到了第二个参数中,以此在Service Worker
中便可以接收到这个port2
,由于我们以后的信息传递都是由MessageChannel
进行,因此这里的onmessage
作用就是很单纯的接收port2
对象端口。
// packages/webrtc/client/worker/index.ts self.onmessage = event => { const port = event.ports[0]; if (!port) return void 0; };
那么紧接着我们就需要使用TransformStream
进行数据的读写了,由于TransformStream
本身同样是可转移对象,因此我们可以将其直接定义在主线程中,然后在初始化文件下载时,将readable
端传递到Service Worker
中,并将其作为下载的ReadableStream
实例构造Response
对象。那么接下来在主线程创建iframe
触发下载行为之后,我们就可以在Fetch Event
中从map
中读取readable
了。
// packages/webrtc/client/worker/event.ts const ts = new TransformStream(); WorkerEvent.channel.port1.postMessage( { key: MESSAGE_TYPE.TRANSFER_START, id: fileId, readable: ts.readable, } as MessageType, [ts.readable] ); WorkerEvent.writer.set(fileId, ts.writable.getWriter()); // 构造 iframe 触发下载行为 // ... // packages/webrtc/client/worker/index.ts port.onmessage = event => { const payload = event.data as MessageType; if (!payload) return void 0; if (payload.key === MESSAGE_TYPE.TRANSFER_START) { const { id, readable } = payload; map.set(id, [readable]); } }; // 在触发下载行为后 从 map 中读取 readable // ...
在主线程中,我们关注的是内容的写入,以及内置的背压控制,由于TransformStream
本身内部实现的队列以及背压控制,我们就不需要太过于关注数据生产造成的问题,因为在先前我们实现的WebRTC
下载的反馈链路是完善的,我们在这里只需要借助await
控制写入速度即可。在这里有趣的是,即使TransformStream
的readable
与writable
两端现在是运行在两个上下文环境中,其依然能够进行数据读写以及背压控制。
// packages/webrtc/client/worker/event.ts const writer = WorkerEvent.writer.get(fileId); if (!writer) return void 0; // 感知 BackPressure 需要主动 await ready await writer.ready; return writer.write(new Uint8Array(data));
那么在数据块的数量即total
的最后一个块完成传输后,我们就需要将整个传输行为进行回收。首先是TransformStream
的writable
端需要关闭,这个Writer
必须主动调度关闭方法,否则浏览器无法感知下载完成,会一直处于等待下载完成的状态,其次就是我们需要将创建的iframe
从body
上回收,在Service Worker
中我们也需要将map
中的数据进行清理,避免先前的链接还能够响应等问题。
// packages/webrtc/client/worker/event.ts const iframe = document.getElementById(fileId); iframe && iframe.remove(); WorkerEvent.channel?.port1.postMessage({ key: MESSAGE_TYPE.TRANSFER_CLOSE, id: fileId, } as MessageType); const writer = WorkerEvent.writer.get(fileId); writer?.close(); WorkerEvent.writer.delete(fileId); // packages/webrtc/client/worker/index.ts port.onmessage = event => { const payload = event.data as MessageType; if (!payload) return void 0; if (payload.key === MESSAGE_TYPE.TRANSFER_CLOSE) { const { id } = payload; map.delete(id); } };
兼容考量
在现代浏览器中Service Worker
、Fetch API
、Stream API
都已经得到了比较良好的支持,在这里我们使用到的相对最新特性TransformStream
的兼容性也是不错的,在2022
年后发布的浏览器版本基本得到了支持,然而如果我们在MDN
的TransformStream
兼容性中仔细观察一下,则会发现TransformStream
作为transferable
在Safari
中至今还未支持。
那么在这里会造成什么问题呢,我们可以注意到在先前TRANSFER_START
的时候,我们是将TransformStream
的readable
端作为Transferable Object
传递到Service Worker
中,那么此时由于Safari
不支持这个行为,我们的ReadableStream
自然就无法传递到Service Worker
中,因此我们后续的下载行为就无法继续了,因此如果需要兼容Safari
的情况下,我们需要处理这个问题。
这个问题的原因是我们无法将ReadableStream
转移所有权到Service Worker
中,因此可以想到的比较简单的办法就是直接在Service Worker
中定义ReadableStream
即可。也就是说,当传输开始时,我们实例化ReadableStream
并且保存其控制器对象,当数据传递的时候,我们直接将数据块enqueue
到缓冲队列中,而在传输结束时,我们直接调用controller.close()
方法即可,而这个readable
对象我们就可以直接作为请求拦截的Response
响应为下载内容。
let controller: ReadableStreamDefaultController | null = null; const readable = new ReadableStream({ start(ctr) { controller = ctr; }, cancel(reason) { console.log("ReadableStream Aborted", reason); }, }); map.set(fileId, [readable, controller!, Number(fileTotal)]); self.onmessage = event => { const data = event.data as BufferType; destructureChunk(data).then(({ id, series, data }) => { const stream = map.get(id); if (!stream) return void 0; const [, controller, size] = stream; controller.enqueue(new Uint8Array(data)); if (series === size - 1) { controller.close(); map.delete(id); } }); };
那么在这里我们就会意识到先前我们聊到的背压问题,由于在这里我们没有任何背压的反馈机制,而是仅仅将主线程的数据块全部接收并且enqueue
到ReadableStream
中,那么在数据传输速度比浏览器控制的下载IO
速度快的情况下,很容易就会出现数据积压的情况。因此我们就需要想办法实现背压的控制,那么我们就可以比较容易地想到下面的方式。
- 在实例化
ReadableStream
对象的时候,我们借助CountQueuingStrategy
创建足够大的缓冲区,因为本身在传输的过程中我们已经得知了整个文件的大小以及分块的数量等信息,因此创建足够大的缓冲区是可行的。当然我们可能也没必要创建等同于分块数量大小的缓冲区,我们可以将其除2
取整或者取对数都可以,毕竟下载的时候也通过写硬盘在不断消费的。 - 在实例化
ReadableStream
时传递的underlyingSource
对象中,除了start
方法外实际上还有pull
方法,当流的内部数据块队列未满时将会被反复调用,直到达到其高水印,我们则可以通过这个方法的调用作为事件驱动的机制来控制流的频率,需要注意的是只有在其至少入队一个数据块才会被反复调用,如果在pull
函数调用的时候没有实际入队块,则不会被重复调用。
我们在这里首先来看一下分配足够大的缓冲队列的问题,如果深入思考一下,即使分配了足够大的缓冲区,我们实际上并没有实现任何反馈机制去控制减缓数据的生产环节,那么这个缓冲区即使足够大也并没有解决我们的内存溢出问题,虽然即使实例化时分配了足够大的缓冲,也不会立即分配这么大的内存。那么此时即使我们不分配那么大的缓冲区,以默认模式实现的队列也是完全一样的,只是其内部的desiredSize
会变成比较大的负值,而数据也并没有实际丢失,因为此时浏览器的流实现会将数据存储在内存中,直到消费方读取为止。
那么我们再来看一下第二个实现,通过pull
方法我们确实可以获得来自ReadableStream
的缓冲队列反馈,那么我们就可以简单实现一个控制流的方式,考虑到我们会有两种状态,即生产大于消费以及消费大于生产,那么我们就不能单纯的在pull
的时候再拉取数据,我们应该在内部再实现一个缓冲队列,而我们的事件驱动置入数据应该有两部分,分别是缓冲队列置入数据时需要检查是否上次拉取的数据没有成功而是在等待,此时需要调度上次pull
时未完成的Promise
,也就是消费大于生产的情况,还有一个事件是pull
时直接检查缓冲队列是否有数据,如果有则直接置入数据,也就是生产大于消费的情况。
const pending = new WeakMap<ReadableStream, (stream: string) => void>(); const queue = ["1", "2", "3", "4"]; const strategy = new CountQueuingStrategy({ highWaterMark: 3 }); const underlyingSource: UnderlyingDefaultSource<string> = { pull(controller) { if (!queue.length) { console.log("Pull Pending"); return new Promise<void>(resolve => { const handler = (stream: string) => { controller.enqueue(stream); pending.delete(readable); console.log("Pull Restore", stream); resolve(); }; pending.set(readable, handler); }); } const next = queue.shift(); controller.enqueue(next); console.log("Pull", next); return void 0; }, }; const readable = new ReadableStream<string>(underlyingSource, strategy); const write = (stream: string) => { if (pending.has(readable)) { console.log("Write Pending Pull", stream); pending.get(readable)!(stream); } else { console.log("Write Queue", stream); queue.push(stream); } }; // 使得读取任务后置 先让 pull 将 Readable 缓冲队列拉满 setTimeout(async () => { // 此时 queue 队列中还存在数据 生产大于消费 const reader = readable.getReader(); console.log("Read Twice"); // 读取后 queue 队列中数据已经读取完毕 消费等于生产 console.log("Read", await reader.read()); // 读取后 queue 队列为空 Readable 缓冲队列未满 // 之后 Readable 仍然发起 pull 事件 消费大于生产 console.log("Read", await reader.read()); console.log("Write Twice"); // 写入挂起的 pull 任务 消费等于生产 write("5"); // 写入 queue 队列 生产大于消费 write("6"); }, 100); // Pull 1 // Pull 2 // Pull 3 // Read Twice // Pull 4 // Read {value: '1', done: false} // Pull Pending // Read {value: '2', done: false} // Write Twice // Write Pending Pull 5 // Pull Restore 5 // Write Queue 6
看起来我们实现了非常棒的基于pull
的缓冲队列控制,但是我们仔细研究一下会发现我们似乎忽略了什么,我们是不是仅仅是将ReadableStream
内置的缓冲队列提出来到了外边,实际上我们还是会面临内存压力,只不过这里的数据积压的位置从ReadableStream
转移到了我们自己定义的数组之后,我们似乎完全没有解决问题。
那么我们再来思考一下问题到底是出在哪里,当我们使用TransformStream
的时候我们的背压控制似乎仅仅是await writer.ready
就实现了,那么这里究竟意味着什么,我们可以很明显地思考出来这里是携带者反馈机制的,也就是说当其认为内部的队列承压之后,会主动阻塞生产者的数据生产,而我们的实现中似乎并没有从Service Worker
到主线程的反馈机制,因此我们才没有办法处理背压问题。
那么我们再看得本质一些,我们的通信方式是postMessage
,那么在这里的问题是什么呢,或者是说如果我们想在主线程使用await
的方式直接控制背压的话,我们缺乏的是什么,很明显是因为我们没有办法获得传输后事件的响应,那么在这里因为postMessage
是单向通信的,我们没有办法做到postMessage().then()
这样的操作,甚至于我们可以在postMessage
之后立即置ready
为挂起的Promise
,等待响应数据的resolve
,由此就可以做到类似的操作了。
这个操作并不复杂,那么我们可不可以将其做的更通用一些,类似于fetch
的实现,当我们发起一个请求/推送后,我们可以借助Promise
在一定时间内甚至一直等待其对应的响应,而由于我们的postMessage
是单向的数据传输,我们就需要在数据的层面上增加id
标识,以便于我们可以得知当前的响应究竟应该resolve
哪个Promise
。
考虑到这里,我们就需要处理数据的传输问题,也就是说由于我们需要对原始的数据中追加标识信息并不是一件容易的事,在postMessage
中如果是字符串数据我们可以直接再构造一层对象,然而如果是ArrayBuffer
数据的话,我们就需要操作其本身的Buffer
,这显然是有些费劲的。因此我希望能够有一些简单的办法将其序列化,然后就可以以字符串的形式进行传输了,在这里我考虑了BASE64
、Uint8Array
、Uint32Array
的序列化方式。
我们就以最简单的8
个字节为例,分别计算一下序列化之后的BASE64
、Uint8Array
、Uint32Array
体积问题。如果我们此时数据的每位都是0
的话,分别计算出的编码结果为AAAAAAAAAAA=
、[0,0,0,0,0,0,0,0]
、[0,0]
,占用了12
字符、17
字符、5
字符的体积。
const buffer = new ArrayBuffer(8); const input = new Uint8Array(buffer); input.fill(0); const binaryStr = String.fromCharCode.apply(null, input); console.log("BASE64", btoa(binaryStr)) ; // AAAAAAAAAAA= const uint8Array = new Uint8Array(input); console.log("Uint8Array", uint8Array); // Uint8Array(8) [0, 0, 0, 0, 0, 0, 0, 0] const uint32Array = new Uint32Array(input.buffer); console.log("Uint32Array", uint32Array); // Uint32Array(2) [0, 0]
在上边的结果中我们看起来是Uint32Array
的序列化结果最好,然而这是我们上述所有位都填充为0
的情况,然而在实际的传输过程中肯定是没有这么理想的,那么我们再举反例,将其全部填充为1
来测试效果。此时的结果就变得不一样了,分别计算出的编码结果为//////////8=
、[255,255,255,255,255,255,255,255]
、[4294967295,4294967295]
,占用了12
字符、33
字符、23
字符的体积。
const buffer = new ArrayBuffer(8); const input = new Uint8Array(buffer); input.fill(0); const binaryStr = String.fromCharCode.apply(null, input); console.log("BASE64", btoa(binaryStr)) ; // //////////8= const uint8Array = new Uint8Array(input); console.log("Uint8Array", uint8Array); // Uint8Array(8) [255, 255, 255, 255, 255, 255, 255, 255] const uint32Array = new Uint32Array(input.buffer); console.log("Uint32Array", uint32Array); // Uint32Array(2) [4294967295, 4294967295]
这么看起来,还是BASE64
的序列化结果比较稳重,因为其本身就是按位的编码方式,其会将每6 bits
编码共64
按照索引取数组中的字符,这样就变成了每3
个字节即24 bits
会编码为4
个字符变成32 bits
,而此时我们有8
个字节也就是64 bits
,不能够被24 bits
完全整除,那么此时我们先处理前6
个字节,如果全位都是0
的话,那么前8
个字符就全部是A
,而此时我们还剩下16 bits
,那么我们就填充8 bits
将其凑为24 bits
,然后再编码为4
个字符(最后6 bits
由=
填充),因此最终的结果就是12
个字符。
然而在这里我发现是我想多了,实际上我们并不需要考虑序列化的编码问题,在我们的RTC DataChannel
确实是必须要纯字符串或者是ArrayBuffer
等数据,不能直接传输对象,但是在postMessage
中我们可以传递的数据是由The Structured Clone Algorithm
算法控制的,而ArrayBuffer
对象也是赫然在列的,而且也不需要借助transfer
能力来实现所有权问题,其会实际执行内置的序列化方法。在我的实际测试中Chrome
、Firefox
、Safari
都是支持这种直接的数据传输的,这里的传输毕竟都是在同一浏览器中进行的,其数据传输可以更加宽松一些。
<!-- index.html --> <script> navigator.serviceWorker.register("./sw.js", { scope: "./" }).then(res => { window.sw = res; }) </script>
// sw.js self.onmessage = (event) => { console.log("Message", event); self.message = event; }; // 控制台执行 观察 SW 的数据响应以及值 const buffer = new ArrayBuffer(8); const input = new Uint8Array(buffer); input.fill(255); sw.active.postMessage({ id: "test", buffer })
那么我们对于需要从Service Worker
响应的数据实现就简单很多了,毕竟我们现在只需要将其当作普通的对象处理就可以了,也不需要考虑任何序列化的问题。此时我们就利用好Promise
的特性,当接收到postMessage
响应的时候,从全局的存储中查找当前id
对应的resolve
,并且将携带的数据作为参数执行即可,至此我们就可以很方便地进行背压的反馈了,我们同样也可以加入一些超时机制等避免resolve
的积压。
// 模拟 onMessage 方法 let onMainMessage: ((event: { id: string; payload: string }) => void) | null = null; let onWorkerMessage: ((event: { id: string; payload: string }) => void) | null = null; // 模拟 postMessage 方法 const postToWorker = (id: string, payload: string) => { onWorkerMessage?.({ id, payload }); }; const postToMain = (id: string, payload: string) => { onMainMessage?.({ id, payload }); }; // Worker (() => { onWorkerMessage = ({ id, payload }) => { console.log("Worker Receive", id, payload); setTimeout(() => { postToMain(id, "pong"); }, 1000); }; })(); // Main (() => { const map = new Map<string, (value: { id: string; payload: string }) => void>(); onMainMessage = ({ id, payload }) => { const resolve = map.get(id); resolve?.({ id, payload }); map.delete(id); }; const post = (payload: string) => { const id = Math.random().toString(36).slice(2); return new Promise<{ id: string; payload: string }>(resolve => { map.set(id, resolve); postToWorker(id, payload); }); }; post("ping").then(res => { console.log("Main Receive", res.id, res.payload); }); })();
每日一题
https://github.com/WindrunnerMax/EveryDay
参考
https://juejin.cn/post/6844904029244358670 https://github.com/jimmywarting/StreamSaver.js https://github.com/jimmywarting/native-file-system-adapter https://developer.mozilla.org/zh-CN/docs/Web/API/FetchEvent https://nodejs.org/docs/latest/api/stream.html#types-of-streams https://developer.mozilla.org/en-US/docs/Web/API/TransformStream https://help.aliyun.com/zh/oss/user-guide/map-custom-domain-names-5 https://developer.mozilla.org/zh-CN/docs/Web/HTML/Element/a#download https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Headers/Content-Disposition https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Concepts#backpressure https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm