Esta postagem analisará como trabalhar com a API JavaScript Streams, que permite fazer uma chamada HTTP de busca e receber uma resposta de streaming em partes, o que permite que um cliente comece a responder mais a uma resposta do servidor rapidamente e crie UIs como ChatGPT.
Como exemplo motivador, implementaremos uma função para lidar com a resposta de streaming LLM do OpenAI (ou qualquer servidor usando a mesma API de streaming http), sem usar dependências npm - apenas a busca integrada. O código completo está aqui, incluindo novas tentativas com espera exponencial, incorporações, bate-papo sem streaming e APIs mais simples para interagir com conclusões e incorporações de bate-papo.
Se você estiver interessado em saber como também retornar um stream HTTP para clientes, confira esta postagem.
Aqui está o exemplo completo. Veremos cada peça abaixo:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Veja o código aqui para uma versão que possui boas sobrecargas digitadas para variantes de parâmetros de streaming e não streaming, junto com novas tentativas e outras melhorias.
O resto da postagem é sobre como entender o que esse código faz.
Fazendo o pedido
Esta parte é realmente muito fácil. Uma resposta HTTP de streaming vem de uma solicitação HTTP normal:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });Os cabeçalhos HTTP são enviados normalmente e não é necessário definir nada em particular para ativar o streaming. E você ainda pode aproveitar cabeçalhos de cache regulares para streaming HTTP.
Tratamento de erros
A história sobre erros no lado do cliente é um pouco infeliz para streaming HTTP. A vantagem é que, para streaming HTTP, o cliente obtém códigos de status imediatamente na resposta inicial e pode detectar falhas. A desvantagem do protocolo http é que se o servidor retornar sucesso, mas depois interromper o fluxo, não há nada no nível do protocolo que informe ao cliente que o fluxo foi interrompido. Veremos abaixo como o OpenAI codifica uma sentinela “tudo pronto” no final para contornar isso.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }Lendo o fluxo
Para ler uma resposta de streaming HTTP, o cliente pode usar a propriedade response.body que é um ReadableStream que permite iterar sobre os pedaços conforme eles chegam do servidor usando o método .getReader(). 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }Isso lida com todos os dados que recebemos de volta, mas para o protocolo OpenAI HTTP esperamos que os dados sejam JSON separados por novas linhas, então, em vez disso, dividiremos o corpo da resposta e “renderemos” cada linha conforme eles ' está concluído. Armazenamos a linha em andamento em lastFragment e retornamos apenas linhas completas que foram separadas por duas novas linhas:
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Se esta função* e a sintaxe de rendimento não são familiares para você, apenas trate a função* como uma função que pode retornar várias coisas em um loop e o rendimento como a forma de retornar algo várias vezes de uma função.
Você pode então fazer um loop sobre esta função splitStream como:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }Se essa sintaxe "for await" confunde você, ele está usando o que é chamado de "iterador assíncrono" - como um iterador normal que você usaria com um loop for, mas toda vez que obtém o próximo valor, ele é aguardado.
Para nosso exemplo, quando obtivermos algum texto do OpenAI e estivermos esperando por mais, o loop for esperará até que splitStream produza outro valor, o que acontecerá quando await reader.read() retornar um valor que termine uma ou mais linhas de texto.
A seguir, veremos outra maneira de retornar um iterador assíncrono que não é uma função como splitStream, para que um chamador possa usar um loop “for await” para iterar sobre esses dados.
Retornando um iterador assíncrono
Agora que temos um iterador assíncrono retornando linhas completas de texto, poderíamos simplesmente retornar splitStream(response.body), mas queremos interceptar cada uma das linhas e transformá-las, enquanto ainda deixamos o chamador de nossa função iterar .
A abordagem é semelhante à sintaxe da função assíncrona* acima. Aqui retornaremos um iterador assíncrono diretamente, em vez de uma função assíncrona que retorna um quando é chamada. A diferença é que o tipo é AsyncIterator em vez de AsyncGenerator que precisa ser chamado primeiro. Um AsyncIterator pode ser definido tendo uma determinada função nomeada: Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };Isso é útil quando você deseja retornar algo diferente dos dados provenientes do splitStream. Cada vez que uma nova linha chega da solicitação HTTP de streaming, splitStream irá produzi-la, esta função irá recebê-la em dados e pode fazer algo antes de entregá-la ao seu chamador.
A seguir, veremos como interpretar esses dados especificamente no caso da API de conclusão de chat de streaming da OpenAI.
Lidando com o protocolo de streaming HTTP OpenAI
O protocolo de resposta OpenAI é uma série de linhas que começam com dados: ou evento:, mas trataremos apenas das respostas de dados, já que essa é a parte útil para conclusões de bate-papo. Há uma sentinela de [DONE] se o stream estiver concluído, caso contrário, é apenas JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }Juntando tudo
Agora que você entende o streaming HTTP, pode se sentir confiante ao trabalhar diretamente com APIs de streaming sem depender de SDKs ou bibliotecas. Isso permite ocultar a latência, pois sua UI pode começar a atualizar imediatamente, sem consumir mais largura de banda com múltiplas solicitações. Você pode usar a função acima como faria com o pacote oficial openai npm:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }Veja aqui o código que também permite criar algumas funções utilitárias para tornar isso ainda mais fácil pré-configurando o modelo e extraindo o .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }Antes de copiar o código, tente implementá-lo você mesmo como um exercício de funções assíncronas.
Mais recursos
- Para obter informações sobre como retornar dados de streaming HTTP do seu próprio endpoint de servidor, confira esta postagem no AI Chat com HTTP Streaming que transmite dados do OpenAI (ou similar) para o seu servidor e simultaneamente os transmite para um cliente, enquanto faz lógica personalizada à medida que avança (como salvar pedaços em um banco de dados).
- Os documentos do MDN, como sempre, são ótimos. Além dos links acima, aqui está um guia sobre a API de fluxos legíveis que mostra como conectar um fluxo legível a uma tag para transmitir em uma solicitação de imagem. Nota: este guia usa response.body como um iterador assíncrono, mas atualmente isso não é amplamente implementado e nem nos tipos TypeScript.
Nota: você só pode ter um leitor do stream por vez, então geralmente não chama .getReader() várias vezes - você provavelmente deseja .tee() nesse caso, e se quiser usar . getReader() várias vezes por algum motivo, certifique-se de ter o primeiro .releaseLock() primeiro. ↩
Ou, alternativamente, você pode. Se você não estiver familiarizado com o Symbol, ele é usado de forma a ter chaves em um objeto que não são strings ou números. Dessa forma, eles não entrarão em conflito se você adicionar uma chave chamada asyncIterator. Você pode acessar a função com myIterator[Symbol.asyncIterator](). ↩
Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.
Copyright© 2022 湘ICP备2022001581号-3