Cet article examinera l'utilisation de l'API JavaScript Streams qui permet d'effectuer un appel HTTP de récupération et de recevoir une réponse en streaming par morceaux, ce qui permet à un client de commencer à répondre davantage à une réponse du serveur. rapidement et créez des interfaces utilisateur comme ChatGPT.
À titre d'exemple motivant, nous implémenterons une fonction pour gérer la réponse LLM en streaming d'OpenAI (ou de tout serveur utilisant la même API de streaming http), en n'utilisant aucune dépendance npm, juste la récupération intégrée. Le code complet est ici, y compris les tentatives avec interruption exponentielle, les intégrations, le chat sans streaming et des API plus simples pour interagir avec les complétions et les intégrations de chat.
Si vous souhaitez savoir comment renvoyer également un flux HTTP aux clients, consultez cet article.
Voici l'exemple complet. Nous examinerons chaque pièce ci-dessous :
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Voir le code ici pour une version qui comporte de belles surcharges typées pour les variantes de paramètres de streaming et non-streaming, ainsi que des tentatives et d'autres améliorations.
Le reste de l'article vise à comprendre ce que fait ce code.
Faire la demande
Cette partie est en fait très simple. Une réponse HTTP en streaming provient d'une requête HTTP normale :
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });Les en-têtes HTTP sont envoyés comme d'habitude et ne nécessitent rien de particulier pour activer le streaming. Et vous pouvez toujours exploiter les en-têtes de mise en cache classiques pour le streaming HTTP.
Gestion des erreurs
L'histoire des erreurs côté client est un peu malheureuse pour le streaming HTTP. L'avantage est que pour le streaming HTTP, le client obtient immédiatement les codes d'état dans la réponse initiale et peut y détecter un échec. L'inconvénient du protocole http est que si le serveur renvoie un succès mais s'interrompt ensuite en cours de flux, rien au niveau du protocole n'indiquera au client que le flux a été interrompu. Nous verrons ci-dessous comment OpenAI encode une sentinelle « tout est fait » à la fin pour contourner ce problème.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }Lire le flux
Afin de lire une réponse de streaming HTTP, le client peut utiliser la propriété Response.body qui est un ReadableStream vous permettant de parcourir les morceaux à mesure qu'ils proviennent du serveur à l'aide de la méthode .getReader(). 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }Cela gère chaque bit de données que nous récupérons, mais pour le protocole HTTP OpenAI, nous nous attendons à ce que les données soient JSON séparées par des nouvelles lignes, nous allons donc diviser le corps de la réponse et « céder » chaque ligne au fur et à mesure. re terminé. Nous tamponnons la ligne en cours dans lastFragment et renvoyons uniquement les lignes complètes séparées par deux nouvelles lignes :
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Si cette fonction* et la syntaxe de rendement ne vous sont pas familières, traitez simplement function* comme une fonction qui peut renvoyer plusieurs éléments dans une boucle, et rendement comme moyen de renvoyer quelque chose plusieurs fois à partir d'une fonction.
Vous pouvez ensuite parcourir cette fonction splitStream comme :
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }Si cette syntaxe "for wait" vous déstabilise, elle utilise ce qu'on appelle un "itérateur asynchrone" - comme un itérateur normal que vous utiliseriez avec une boucle for, mais chaque fois qu'il obtient la valeur suivante, elle est attendue.
Pour notre exemple, lorsque nous avons obtenu du texte d'OpenAI et que nous en attendons plus, la boucle for attendra que splitStream produise une autre valeur, ce qui se produira lorsque wait reader.read() renvoie une valeur qui se termine une ou plusieurs lignes de texte.
Ensuite, nous examinerons une autre façon de renvoyer un itérateur asynchrone qui n'est pas une fonction comme splitStream, afin qu'un appelant puisse utiliser une boucle « for wait » pour parcourir ces données.
Renvoyer un itérateur asynchrone
Maintenant que nous avons un itérateur asynchrone renvoyant des lignes complètes de texte, nous pourrions simplement renvoyer splitStream(response.body), mais nous voulons intercepter chacune des lignes et les transformer, tout en laissant l'appelant de notre fonction itérer .
L'approche est similaire à la syntaxe de la fonction asynchrone* ci-dessus. Ici, nous renverrons directement un itérateur asynchrone, au lieu d'une fonction asynchrone qui en renvoie un lorsqu'elle est appelée. La différence est que le type est AsyncIterator au lieu de AsyncGenerator qui doit être appelé en premier. Un AsyncIterator peut être défini en ayant une certaine fonction nommée : Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };Ceci est utile lorsque vous souhaitez renvoyer quelque chose de différent des données provenant de splitStream. Chaque fois qu'une nouvelle ligne provient de la requête HTTP de streaming, splitStream la donnera, cette fonction la recevra sous forme de données et pourra faire quelque chose avant de la céder à son appelant.
Nous verrons ensuite comment interpréter ces données spécifiquement dans le cas de l'API de complétion de chat en streaming d'OpenAI.
Gestion du protocole de streaming HTTP OpenAI
Le protocole de réponse OpenAI est une série de lignes qui commencent par data: ou event:, mais nous nous contenterons de gérer les réponses aux données, car c'est la partie utile pour terminer le chat. Il y a une sentinelle de [DONE] si le flux est terminé, sinon c'est juste du JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }Rassembler tout cela
Maintenant que vous comprenez le streaming HTTP, vous pouvez travailler en toute confiance directement avec les API de streaming sans compter sur des SDK ou des bibliothèques. Cela vous permet de masquer la latence, car votre interface utilisateur peut immédiatement commencer la mise à jour, sans consommer plus de bande passante avec plusieurs requêtes. Vous pouvez utiliser la fonction ci-dessus comme vous le feriez avec le package officiel openai npm :
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }Voir le code ici qui vous permet également de créer certaines fonctions utilitaires pour rendre cela encore plus facile en préconfigurant le modèle et en extrayant le .choices[0].delta.content :
const response = await chatStream(messages); for await (const content of response) { console.log(content); }Avant de copier le code, essayez de l'implémenter vous-même sous forme d'exercice de fonctions asynchrones.
Davantage de ressources
- Pour plus d'informations sur le renvoi des données de streaming HTTP à partir de votre propre point de terminaison de serveur, consultez cet article sur AI Chat avec HTTP Streaming qui diffuse à la fois les données d'OpenAI (ou similaire) vers votre serveur et les diffuse simultanément vers un client, tout en faisant logique personnalisée au fur et à mesure (comme l'enregistrement de morceaux dans une base de données).
- Les documents MDN, comme toujours, sont excellents. Au-delà des liens ci-dessus, voici un guide sur l'API des flux lisibles qui montre comment connecter un flux lisible à une balise pour diffuser une demande d'image. Remarque : ce guide utilise Response.body comme itérateur asynchrone, mais actuellement, cela n'est pas largement implémenté et pas dans les types TypeScript.
Remarque : vous ne pouvez avoir qu'un seul lecteur du flux à la fois, vous n'appelez donc généralement pas .getReader() plusieurs fois - vous voulez probablement .tee() dans ce cas, et si vous souhaitez utiliser . getReader() plusieurs fois pour une raison quelconque, assurez-vous d'avoir le premier .releaseLock() en premier. ↩
Ou bien vous pouvez le faire. Si vous n'êtes pas familier avec Symbol, il est utilisé d'une manière pour avoir des clés dans un objet qui ne sont pas des chaînes ou des nombres. De cette façon, ils n'entrent pas en conflit si vous ajoutez une clé nommée asyncIterator. Vous pouvez accéder à la fonction avec myIterator[Symbol.asyncIterator](). ↩
Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.
Copyright© 2022 湘ICP备2022001581号-3