这篇文章将着眼于使用 JavaScript Streams API,它允许进行 fetch HTTP 调用并以块的形式接收流响应,这允许客户端开始更多地响应服务器响应快速构建像 ChatGPT 这样的 UI。
作为一个激励性的例子,我们将实现一个函数来处理来自 OpenAI(或任何使用相同 http 流 API 的服务器)的流式 LLM 响应,不使用 npm 依赖项,仅使用内置的 fetch。完整的代码在这里,包括指数退避重试、嵌入、非流式聊天以及用于与聊天完成和嵌入交互的更简单的 API。
如果您有兴趣了解如何将 HTTP 流返回给客户端,请查看这篇文章。
这是完整的示例。我们将看看下面的每一个部分:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i 请参阅此处的代码,了解具有流式和非流式参数变体的良好类型重载的版本,以及重试和其他改进。
这篇文章的其余部分是关于理解这段代码的作用。
提出请求
这部分其实很简单。流式 HTTP 响应来自普通 HTTP 请求:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });HTTP 标头按平常方式发送,无需特别设置任何内容即可启用流式传输。您仍然可以利用常规缓存标头进行 HTTP 流式传输。
处理错误
关于客户端错误的故事对于 HTTP 流来说有点不幸。好处是,对于 HTTP 流式传输,客户端会在初始响应中立即获取状态代码,并可以检测到故障。 http 协议的缺点是,如果服务器返回成功,但随后在流中中断,则协议级别没有任何内容可以告诉客户端流已中断。我们将在下面看到 OpenAI 如何在最后编码“全部完成”哨兵来解决这个问题。
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }读取流
为了读取 HTTP 流响应,客户端可以使用 response.body 属性,该属性是一个 ReadableStream,允许您使用 .getReader() 方法迭代从服务器传入的块。 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }这会处理我们返回的每一位数据,但对于 OpenAI HTTP 协议,我们期望数据是由换行符分隔的 JSON,因此我们将拆分响应正文并“生成”每一行。重新完成。我们将进行中的行缓冲到lastFragment中,并且只返回由两个换行符分隔的完整行:
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i 如果你不熟悉这个 function* 和yield 语法,只需将 function* 视为可以在循环中返回多个内容的函数,并将yield视为从函数中多次返回内容的方式。
然后您可以循环此 splitStream 函数,例如:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }如果这个“for wait”语法让您感到困惑,那么它使用了所谓的“异步迭代器”——就像您在 for 循环中使用的常规迭代器一样,但每次它获取下一个值时,都会等待它。
对于我们的示例,当我们从 OpenAI 获取一些文本并且正在等待更多文本时,for 循环将等待,直到 splitStream 产生另一个值,这将在 wait reader.read() 返回一个完成的值时发生一行或多行文本。
接下来我们将研究另一种返回异步迭代器的方法,该迭代器不是 splitStream 等函数,因此调用者可以使用“for wait”循环来迭代此数据。
返回一个异步迭代器
现在我们有一个返回整行文本的异步迭代器,我们可以只返回 splitStream(response.body),但我们希望拦截每一行并转换它们,同时仍然让函数的调用者进行迭代。
该方法类似于上面的 async function* 语法。这里我们将直接返回一个异步迭代器,而不是调用时返回一个的异步函数。不同之处在于类型是 AsyncIterator 而不是需要首先调用的 AsyncGenerator。 AsyncIterator 可以通过某个命名函数来定义:Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };当您想要返回与来自 splitStream 的数据不同的内容时,这非常有用。每次从流式 HTTP 请求中传入新行时,splitStream 都会生成它,该函数将在数据中接收它,并可以在将其生成给调用者之前执行一些操作。
接下来我们将看看如何在 OpenAI 的流式聊天完成 API 的情况下具体解释这些数据。
处理 OpenAI HTTP 流协议
OpenAI 响应协议是一系列以 data: 或 event: 开头的行,但我们只处理数据响应,因为这是完成聊天的有用部分。如果流完成,则有一个 [DONE] 标记,否则它只是 JSON。
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }将所有内容整合在一起
既然您了解了 HTTP 流,您就可以放心地直接使用流 API,而无需依赖 sdk 或库。这使您可以隐藏延迟,因为您的 UI 可以立即开始更新,而不会因为多个请求而消耗更多带宽。您可以像使用官方 openai npm 包一样使用上述功能:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }请参阅此处的代码,它还允许您创建一些实用函数,通过预先配置模型并提取 .choices[0].delta.content:
使这变得更加容易const response = await chatStream(messages); for await (const content of response) { console.log(content); }在复制代码之前,尝试自己实现它作为异步函数的练习。
更多资源
- 有关从您自己的服务器端点返回 HTTP 流数据的信息,请查看关于 AI Chat with HTTP Streaming 的这篇文章,该文章既将数据从 OpenAI(或类似的)流式传输到您的服务器,又同时将其流式传输到客户端,同时执行自定义逻辑(例如将块保存到数据库)。
- MDN 文档一如既往地很棒。除了上面的链接之外,这里还有关于可读流 API 的指南,它展示了如何将可读流连接到 标签以在图像请求中进行流式传输。注意:本指南使用 response.body 作为异步迭代器,但目前尚未广泛实现,并且不在 TypeScript 类型中。
注意:一次只能有一个流的读取器,因此您通常不会多次调用 .getReader() - 在这种情况下您可能需要 .tee() ,并且如果您想使用 .由于某种原因多次 getReader() ,请确保首先拥有第一个 .releaseLock() 。 ↩
或者,如果您不熟悉 Symbol,它的用途是在对象中包含非字符串或数字的键。这样,如果您添加了名为 asyncIterator 的键,它们就不会发生冲突。您可以使用 myIterator[Symbol.asyncIterator]() 访问该函数。 ↩
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3