この投稿では、フェッチ HTTP 呼び出しを実行し、ストリーミング応答をチャンクで受信できるようにする JavaScript Streams API の操作について説明します。これにより、クライアントはサーバー応答への応答を開始できるようになります。 ChatGPT のような UI をすばやく構築します。
やる気を起こさせる例として、npm 依存関係を使用せず、組み込みフェッチのみを使用して、OpenAI (または同じ http ストリーミング API を使用するサーバー) からのストリーミング LLM 応答を処理する関数を実装します。指数関数的バックオフによる再試行、埋め込み、非ストリーミング チャット、チャットの完了と埋め込みを操作するための単純な API を含む完全なコードはここにあります。
HTTP ストリームをクライアントに返す方法にも興味がある場合は、この投稿を参照してください。
これが完全な例です。以下で各部分を見ていきます:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i ストリーミングおよび非ストリーミング パラメーターのバリアントに対する優れた型付きオーバーロード、再試行およびその他の改善を備えたバージョンについては、ここのコードを参照してください。
投稿の残りの部分は、このコードの機能を理解することについてです。
リクエストを行う
この部分は実はとても簡単です。ストリーミング HTTP 応答は通常の HTTP 要求から来ます:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });HTTP ヘッダーは通常どおり送信されるため、ストリーミングを有効にするために特に何も設定する必要はありません。また、HTTP ストリーミングでは通常のキャッシュ ヘッダーを引き続き利用できます。
エラーの処理
クライアント側のエラーに関する話は、HTTP ストリーミングにとって少し残念です。 HTTP ストリーミングの利点は、クライアントが最初の応答でステータス コードをすぐに取得し、そこで障害を検出できることです。 http プロトコルの欠点は、サーバーが成功を返してもストリームの途中で中断した場合、ストリームが中断されたことをクライアントに伝えるものがプロトコル レベルで何もないことです。これを回避するために、OpenAI が最後に「すべて完了」センチネルをエンコードする方法を以下で見ていきます。
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }ストリームを読む
HTTP ストリーミング応答を読み取るために、クライアントは ReadableStream である response.body プロパティを使用できます。これにより、.getReader() メソッドを使用してサーバーから受信したチャンクを反復処理できます。 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }これは返されるデータのすべてのビットを処理しますが、OpenAI HTTP プロトコルの場合、データは改行で区切られた JSON であることが期待されるため、代わりに応答本文を分割し、各行をそのまま「出力」します。再完成しました。進行中の行を lastFragment にバッファリングし、2 つの改行で区切られた完全な行のみを返します:
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i この関数* と yield 構文に慣れていない場合は、関数 * をループ内で複数のものを返すことができる関数として扱い、yield は関数から何かを複数回返す方法として扱います。
次に、この SplitStream 関数を次のようにループできます。
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }この「for await」構文が気になる場合は、いわゆる「非同期イテレータ」が使用されています。これは、for ループで使用する通常のイテレータのようなものですが、次の値を取得するたびに待機されます。
この例では、OpenAI からテキストを取得し、さらに待機している場合、for ループは、splitStream が別の値を生成するまで待機します。これは、await Reader.read() が終了値を返したときに発生します。 1 行以上のテキスト。
次に、splitStream のような関数ではない非同期イテレータを返す別の方法を見ていきます。これにより、呼び出し元は「for await」ループを使用してこのデータを反復処理できます。
非同期イテレータを返す
完全なテキスト行を返す非同期イテレータができたので、splitStream(response.body) を返すだけで済みますが、関数の呼び出し元に反復処理をさせながら、各行をインターセプトして変換したいと考えています。 。
このアプローチは、上記の async function* 構文と似ています。ここでは、呼び出されたときに非同期イテレータを返す非同期関数の代わりに、非同期イテレータを直接返します。違いは、型が AsyncGenerator ではなく AsyncIterator であり、最初に呼び出す必要があることです。 AsyncIterator は、特定の名前付き関数 Symbol.asyncIterator.2
を使用して定義できます。return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };これは、splitStream からのデータとは異なるものを返したい場合に便利です。ストリーミング HTTP リクエストから新しい行が入るたびに、splitStream がそれを生成し、この関数がそれをデータで受け取り、呼び出し元に渡す前に何かを行うことができます。
次に、特に OpenAI のストリーミング チャット完了 API の場合にこのデータを解釈する方法を見ていきます。
OpenAI HTTP ストリーミング プロトコルの処理
OpenAI 応答プロトコルは data: またはevent: で始まる一連の行ですが、データ応答のみを処理します。これはチャットの完了に役立つ部分だからです。ストリームが完了すると [DONE] というメッセージが表示されますが、それ以外の場合は単なる JSON です。
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }すべてをひとつにまとめる
HTTP ストリーミングについて理解したので、SDK やライブラリに依存せずに、自信を持ってストリーミング API を直接操作できるようになります。これにより、複数のリクエストでより多くの帯域幅を消費することなく、UI の更新をすぐに開始できるため、遅延を隠すことができます。上記の関数は、公式の openai npm パッケージと同じように使用できます:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }ここのコードを参照してください。このコードでは、モデルを事前に設定して .choices[0].delta.content:
を抽出することで、これをさらに簡単にするいくつかのユーティリティ関数も作成できます。const response = await chatStream(messages); for await (const content of response) { console.log(content); }コードをコピーする前に、非同期関数の演習としてコードを自分で実装してみてください。
その他のリソース
- 独自のサーバー エンドポイントから HTTP ストリーミング データを返す方法については、HTTP ストリーミングを使用した AI チャットに関するこの投稿を確認してください。これは、OpenAI (または類似のもの) からサーバーにデータをストリーミングし、同時にそれをクライアントにストリーミングします。カスタム ロジック (チャンクをデータベースに保存するなど)。
- MDN ドキュメントはいつものように素晴らしいです。上記のリンク以外に、読み取り可能なストリーム API に関するガイドがあり、読み取り可能なストリームを タグに接続して画像リクエストでストリーミングする方法を示しています。注: このガイドでは、response.body を非同期イテレータとして使用しますが、現在、これは広く実装されておらず、TypeScript 型にも含まれていません。
注: ストリームのリーダーは一度に 1 つだけしか持てないので、通常は .getReader() を複数回呼び出すことはありません。その場合、および を使用したい場合は、おそらく .tee() が必要になります。何らかの理由で getReader() を複数回実行する場合は、必ず最初の .releaseLock() を最初に実行してください。 ↩
または、シンボルに慣れていない場合は、オブジェクトに文字列や数値ではないキーを設定する方法で使用できます。こうすることで、asyncIterator という名前のキーを追加した場合でも競合しなくなります。 myIterator[Symbol.asyncIterator]() を使用して関数にアクセスできます。 ↩
免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。
Copyright© 2022 湘ICP备2022001581号-3