Esta publicación analizará cómo trabajar con JavaScript Streams API, que permite realizar una llamada HTTP de recuperación y recibir una respuesta de transmisión en fragmentos, lo que permite que un cliente comience a responder a una respuesta del servidor más rápidamente y cree interfaces de usuario como ChatGPT.
Como ejemplo motivador, implementaremos una función para manejar la respuesta LLM de transmisión desde OpenAI (o cualquier servidor que use la misma API de transmisión http), sin usar dependencias de npm, solo la búsqueda incorporada. El código completo está aquí, incluidos reintentos con retroceso exponencial, incrustaciones, chat sin transmisión y API más simples para interactuar con finalizaciones e incrustaciones de chat.
Si estás interesado en ver cómo devolver también un flujo HTTP a los clientes, consulta esta publicación.
Aquí está el ejemplo completo. Veremos cada pieza a continuación:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Consulte el código aquí para ver una versión que tiene sobrecargas escritas agradables para variantes de parámetros de transmisión y no transmisión, junto con reintentos y otras mejoras.
El resto de la publicación trata sobre comprender qué hace este código.
haciendo la solicitud
Esta parte es realmente muy fácil. Una respuesta HTTP de transmisión proviene de una solicitud HTTP normal:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });Los encabezados HTTP se envían como de costumbre y no es necesario configurar nada en particular para habilitar la transmisión. Y aún puedes aprovechar los encabezados de almacenamiento en caché habituales para la transmisión HTTP.
Errores de manejo
La historia sobre los errores en el lado del cliente es un poco desafortunada para la transmisión HTTP. La ventaja es que para la transmisión HTTP, el cliente obtiene códigos de estado inmediatamente en la respuesta inicial y puede detectar fallas allí. La desventaja del protocolo http es que si el servidor tiene éxito pero luego se interrumpe a mitad de la transmisión, no hay nada a nivel de protocolo que le indique al cliente que la transmisión fue interrumpida. Veremos a continuación cómo OpenAI codifica un centinela de "todo listo" al final para solucionar este problema.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }leyendo la corriente
Para leer una respuesta de transmisión HTTP, el cliente puede usar la propiedad Response.body, que es un ReadableStream que le permite iterar sobre los fragmentos a medida que ingresan desde el servidor usando el método .getReader(). 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }Esto maneja todos los datos que recibimos, pero para el protocolo HTTP OpenAI esperamos que los datos sean JSON separados por nuevas líneas, por lo que dividiremos el cuerpo de la respuesta y "produciremos" cada línea a medida que aparecen. re completado. Almacenamos la línea en progreso en lastFragment y solo devolvemos líneas completas que han sido separadas por dos nuevas líneas:
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Si esta función* y la sintaxis de rendimiento no te resultan familiares, simplemente trata la función* como una función que puede devolver varias cosas en un bucle y rendimiento como la forma de devolver algo varias veces desde una función.
Luego puedes recorrer esta función splitStream como:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }Si esta sintaxis "for await" te confunde, está usando lo que se llama un "iterador asíncrono", como un iterador normal que usarías con un bucle for, pero cada vez que obtiene el siguiente valor, se espera.
Para nuestro ejemplo, cuando recibimos algo de texto de OpenAI y estamos esperando más, el bucle for esperará hasta que splitStream produzca otro valor, lo que sucederá cuando await Reader.read() devuelva un valor que finalice. una o más líneas de texto.
A continuación veremos otra forma de devolver un iterador asíncrono que no sea una función como splitStream, de modo que una persona que llama pueda usar un bucle "for await" para iterar sobre estos datos.
Devolver un iterador asíncrono
Ahora que tenemos un iterador asíncrono que devuelve líneas completas de texto, podríamos simplemente devolver splitStream(response.body), pero queremos interceptar cada una de las líneas y transformarlas, al tiempo que permitimos que la persona que llama a nuestra función itere. .
El enfoque es similar a la sintaxis de la función asíncrona* anterior. Aquí devolveremos un iterador asíncrono directamente, en lugar de una función asíncrona que devuelve uno cuando se llama. La diferencia es que el tipo es AsyncIterator en lugar de AsyncGenerator, que debe llamarse primero. Un AsyncIterator se puede definir teniendo una determinada función con nombre: Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };Esto es útil cuando desea devolver algo diferente a los datos provenientes de splitStream. Cada vez que llega una nueva línea de la solicitud HTTP de transmisión, splitStream la entregará, esta función la recibirá en datos y podrá hacer algo antes de entregársela a la persona que llama.
A continuación veremos cómo interpretar estos datos específicamente en el caso de la API de finalización de chat en streaming de OpenAI.
Manejo del protocolo de transmisión HTTP OpenAI
El protocolo de respuesta de OpenAI es una serie de líneas que comienzan con datos: o evento:, pero solo manejaremos las respuestas de datos, ya que esa es la parte útil para completar el chat. Hay un centinela de [DONE] si la transmisión finaliza; de lo contrario, es solo JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }Reuniéndolo todo
Ahora que comprende la transmisión HTTP, puede sentirse seguro al trabajar directamente con las API de transmisión sin depender de SDK o bibliotecas. Esto le permite ocultar la latencia, ya que su interfaz de usuario puede comenzar a actualizarse inmediatamente, sin consumir más ancho de banda con múltiples solicitudes. Puede utilizar la función anterior como lo haría con el paquete oficial de openai npm:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }Vea el código aquí que también le permite crear algunas funciones de utilidad para hacerlo aún más fácil al preconfigurar el modelo y extraer los .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }Antes de copiar el código, intenta implementarlo tú mismo como un ejercicio de funciones asíncronas.
Más recursos
- Para obtener información sobre cómo devolver datos de transmisión HTTP desde el punto final de su propio servidor, consulte esta publicación en AI Chat con transmisión HTTP que transmite datos desde OpenAI (o similar) a su servidor y simultáneamente los transmite a un cliente, mientras hace lógica personalizada a medida que avanza (como guardar fragmentos en una base de datos).
- Los documentos de MDN, como siempre, son geniales. Más allá de los enlaces anteriores, aquí hay una guía sobre la API de transmisiones legibles que muestra cómo conectar una transmisión legible a una etiqueta para transmitir en una solicitud de imagen. Nota: esta guía utiliza Response.body como iterador asíncrono, pero actualmente no está ampliamente implementado y no en los tipos de TypeScript.
Nota: solo puedes tener un lector de la transmisión a la vez, por lo que generalmente no llamas a .getReader() varias veces; probablemente quieras .tee() en ese caso, y si quieres usar . getReader() varias veces por algún motivo, asegúrese de tener primero el primer .releaseLock(). ↩
O, alternativamente, puedes hacerlo. Si no estás familiarizado con el símbolo, se usa para tener claves en un objeto que no son cadenas ni números. De esa manera no entran en conflicto si agrega una clave llamada asyncIterator. Podrías acceder a la función con myIterator[Symbol.asyncIterator](). ↩
Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.
Copyright© 2022 湘ICP备2022001581号-3