„Wenn ein Arbeiter seine Arbeit gut machen will, muss er zuerst seine Werkzeuge schärfen.“ – Konfuzius, „Die Gespräche des Konfuzius. Lu Linggong“
Titelseite > Programmierung > Streaming von HTTP-Antworten mithilfe von Fetch

Streaming von HTTP-Antworten mithilfe von Fetch

Veröffentlicht am 31.07.2024
Durchsuche:659

Streaming HTTP Responses using fetch

In diesem Beitrag geht es um die Arbeit mit der JavaScript-Streams-API, die es ermöglicht, einen Fetch-HTTP-Aufruf durchzuführen und eine Streaming-Antwort in Blöcken zu empfangen, wodurch ein Client mehr auf eine Serverantwort reagieren kann schnell und erstellen Sie Benutzeroberflächen wie ChatGPT.

Als motivierendes Beispiel implementieren wir eine Funktion, um die Streaming-LLM-Antwort von OpenAI (oder einem beliebigen Server, der dieselbe http-Streaming-API verwendet) zu verarbeiten, ohne npm-Abhängigkeiten – nur den integrierten Abruf. Der vollständige Code ist hier, einschließlich Wiederholungsversuchen mit exponentiellem Backoff, Einbettungen, Nicht-Streaming-Chat und einfacheren APIs für die Interaktion mit Chat-Abschlüssen und Einbettungen.

Wenn Sie wissen möchten, wie Sie auch einen HTTP-Stream an Clients zurückgeben können, schauen Sie sich diesen Beitrag an.

Vollständiger Beispielcode

Hier ist das vollständige Beispiel. Wir werden uns jedes Stück unten ansehen:

async function createChatCompletion(body: ChatCompletionCreateParams) {
  // Making the request
  const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
  const response = await fetch(baseUrl   "/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "Authorization": "Bearer "   process.env.LLM_API_KEY,
    },
    body: JSON.stringify(body),
  });
  // Handling errors
  if (!response.ok) {
    const error = await response.text();
    throw new Error(`Failed (${response.status}): ${error}`,
  }
  if (!body.stream) { // the non-streaming case
    return response.json();
  }
  const stream = response.body;
  if (!stream) throw new Error("No body in response");
  // Returning an async iterator
  return {
    [Symbol.asyncIterator]: async function* () {
      for await (const data of splitStream(stream)) {
        // Handling the OpenAI HTTP streaming protocol
        if (data.startsWith("data:")) {
          const json = data.substring("data:".length).trimStart();
          if (json.startsWith("[DONE]")) {
            return;
          }
          yield JSON.parse(json);
        }
      }
    },
  };
}

// Reading the stream  
async function* splitStream(stream: ReadableStream) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment  = data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i 



Sehen Sie sich den Code hier für eine Version an, die schöne typisierte Überladungen für Streaming- und Nicht-Streaming-Parametervarianten sowie Wiederholungsversuche und andere Verbesserungen bietet.

Im Rest des Beitrags geht es darum, zu verstehen, was dieser Code bewirkt.

Die Anfrage stellen

Dieser Teil ist eigentlich sehr einfach. Eine Streaming-HTTP-Antwort stammt von einer normalen HTTP-Anfrage:

const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl   "/v1/chat/completions", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Authorization": "Bearer "   process.env.LLM_API_KEY,
  },
  body: JSON.stringify(body),
});

Die HTTP-Header werden wie gewohnt gesendet und es müssen keine besonderen Einstellungen vorgenommen werden, um Streaming zu ermöglichen. Und Sie können weiterhin normale Caching-Header für HTTP-Streaming nutzen.

Umgang mit Fehlern

Die Geschichte rund um Fehler auf der Clientseite ist für HTTP-Streaming etwas unglücklich. Der Vorteil besteht darin, dass der Client beim HTTP-Streaming sofort in der ersten Antwort Statuscodes erhält und dort einen Fehler erkennen kann. Der Nachteil des http-Protokolls besteht darin, dass, wenn der Server Erfolg meldet, dann aber mitten im Stream abbricht, es auf Protokollebene nichts gibt, was dem Client mitteilt, dass der Stream unterbrochen wurde. Wir werden unten sehen, wie OpenAI am Ende einen „Alles erledigt“-Sentinel kodiert, um dieses Problem zu umgehen.

if (!response.ok) {
  const error = await response.text();
  throw new Error(`Failed (${response.status}): ${error}`,
}

Den Stream lesen

Um eine HTTP-Streaming-Antwort zu lesen, kann der Client die Eigenschaft „response.body“ verwenden, bei der es sich um einen ReadableStream handelt, mit dem Sie mithilfe der Methode „.getReader()“ über die Blöcke iterieren können, wenn sie vom Server eingehen. 1

const reader = request.body.getReader();
try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      const text = TextDecoder().decode(value);
      //... do something with the chunk
    }
} finally {
  reader.releaseLock();
}

Dies verarbeitet jedes Datenbit, das wir zurückerhalten, aber für das OpenAI-HTTP-Protokoll erwarten wir, dass die Daten durch Zeilenumbrüche getrennte JSON-Daten sind, also teilen wir stattdessen den Antworttext auf und „ergeben“ jede Zeile so, wie sie ist. wieder abgeschlossen. Wir puffern die in Bearbeitung befindliche Zeile in lastFragment und geben nur vollständige Zeilen zurück, die durch zwei Zeilenumbrüche getrennt wurden:

// stream here is request.body
async function* splitStream(stream: ReadableStream) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment  = data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i 



Wenn Ihnen diese Funktions*- und Yield-Syntax nicht vertraut ist, behandeln Sie function* einfach als eine Funktion, die mehrere Dinge in einer Schleife zurückgeben kann, und yield als die Möglichkeit, etwas mehrmals von einer Funktion zurückzugeben.

Sie können diese SplitStream-Funktion dann wie folgt durchlaufen:

for await (const data of splitStream(response.body)) {
  // data here is a full line of text. For OpenAI, it might look like
  // "data: {...some json object...}" or "data: [DONE]" at the end
}

Wenn Sie diese „for-await“-Syntax abschreckt, verwendet sie einen sogenannten „asynchronen Iterator“ – wie einen regulären Iterator, den Sie mit einer for-Schleife verwenden würden, aber jedes Mal, wenn er den nächsten Wert erhält, wird darauf gewartet.

Wenn wir in unserem Beispiel Text von OpenAI erhalten haben und auf weiteren Text warten, wartet die for-Schleife, bis splitStream einen anderen Wert liefert eine oder mehrere Textzeilen.

Als nächstes schauen wir uns eine andere Möglichkeit an, einen asynchronen Iterator zurückzugeben, der keine Funktion wie „splitStream“ ist, sodass ein Aufrufer eine „for-await“-Schleife verwenden kann, um über diese Daten zu iterieren.

Gibt einen asynchronen Iterator zurück

Da wir nun einen asynchronen Iterator haben, der vollständige Textzeilen zurückgibt, könnten wir einfach splitStream(response.body) zurückgeben, aber wir möchten jede der Zeilen abfangen und transformieren, während wir dem Aufrufer unserer Funktion weiterhin die Iteration ermöglichen .

Der Ansatz ähnelt der oben genannten Syntax der asynchronen Funktion*. Hier geben wir direkt einen asynchronen Iterator zurück, anstelle einer asynchronen Funktion, die einen zurückgibt, wenn sie aufgerufen wird. Der Unterschied besteht darin, dass der Typ AsyncIterator und nicht AsyncGenerator ist, der zuerst aufgerufen werden muss. Ein AsyncIterator kann durch eine bestimmte benannte Funktion definiert werden: Symbol.asyncIterator.2

      return {
        [Symbol.asyncIterator]: async function* () {
          for await (const data of splitStream(stream)) {
            //handle the data
            yield data;
          }
        },
      };

Dies ist nützlich, wenn Sie etwas anderes als die von splitStream stammenden Daten zurückgeben möchten. Jedes Mal, wenn eine neue Zeile von der Streaming-HTTP-Anfrage eingeht, wird sie von splitStream ausgegeben. Diese Funktion empfängt sie in Daten und kann etwas tun, bevor sie sie an ihren Aufrufer weitergibt.

Als nächstes schauen wir uns an, wie diese Daten speziell im Fall der Streaming-Chat-Vervollständigungs-API von OpenAI interpretiert werden.

Umgang mit dem HTTP-Streaming-Protokoll OpenAI

Das OpenAI-Antwortprotokoll besteht aus einer Reihe von Zeilen, die mit data: oder event: beginnen, aber wir kümmern uns nur um die Datenantworten, da dies der nützliche Teil für den Chat-Abschluss ist. Es gibt einen Wächter von [FERTIG], wenn der Stream fertig ist, andernfalls handelt es sich nur um JSON.

for await (const data of splitStream(stream)) {
  if (data.startsWith("data:")) {
    const json = data.substring("data:".length).trimStart();
    if (json.startsWith("[DONE]")) {
      return;
    }
    yield JSON.parse(json);
  } else {
    console.debug("Unexpected data:", data);
  }
}

Alles zusammenbringen

Da Sie nun HTTP-Streaming verstehen, können Sie sicher sein, direkt mit Streaming-APIs zu arbeiten, ohne auf SDKs oder Bibliotheken angewiesen zu sein. Dadurch können Sie die Latenz ausblenden, da Ihre Benutzeroberfläche sofort mit der Aktualisierung beginnen kann, ohne durch mehrere Anfragen mehr Bandbreite zu verbrauchen. Sie können die obige Funktion wie mit dem offiziellen openai npm-Paket verwenden:

  const response = await createChatCompletion({
    model: "llama3",
    messages: [...your messages...],
    stream: true,
  });
  for await (const chunk of response) {
    if (chunk.choices[0].delta?.content) {
      console.log(chunk.choices[0].delta.content);
    }
  }

Sehen Sie sich hier den Code an, mit dem Sie auch einige Hilfsfunktionen erstellen können, um dies noch einfacher zu machen, indem Sie das Modell vorkonfigurieren und den .choices[0].delta.content extrahieren:

const response = await chatStream(messages);
for await (const content of response) {
  console.log(content);
}

Bevor Sie den Code kopieren, versuchen Sie, ihn als Übung in asynchronen Funktionen selbst zu implementieren.

Mehr Ressourcen

  • Informationen zum Zurückgeben von HTTP-Streaming-Daten von Ihrem eigenen Server-Endpunkt finden Sie in diesem Beitrag zu AI Chat mit HTTP-Streaming, das sowohl Daten von OpenAI (oder ähnlichem) an Ihren Server streamt als auch gleichzeitig an einen Client streamt benutzerdefinierte Logik (z. B. Speichern von Blöcken in einer Datenbank).
  • Die MDN-Dokumente sind wie immer großartig. Über die oben genannten Links hinaus finden Sie hier eine Anleitung zur API für lesbare Streams, die zeigt, wie Sie einen lesbaren Stream mit einem Streaming von HTTP-Antworten mithilfe von Fetch-Tag verbinden, um ihn in einer Bildanfrage zu streamen. Hinweis: In diesem Handbuch wird „response.body“ als asynchroner Iterator verwendet. Derzeit ist dies jedoch nicht weit verbreitet und auch nicht in den TypeScript-Typen.

  1. Hinweis: Sie können jeweils nur einen Leser des Streams haben, daher rufen Sie .getReader() im Allgemeinen nicht mehrmals auf – in diesem Fall möchten Sie wahrscheinlich .tee() und wenn Sie .tee() verwenden möchten. Wenn Sie getReader() aus irgendeinem Grund mehrmals ausführen, stellen Sie sicher, dass Sie zuerst .releaseLock() haben. ↩

  2. Oder alternativ können Sie Folgendes tun: Wenn Sie mit Symbol nicht vertraut sind, wird es verwendet, um Schlüssel in einem Objekt zu haben, bei denen es sich nicht um Zeichenfolgen oder Zahlen handelt. Auf diese Weise kommt es nicht zu Konflikten, wenn Sie einen Schlüssel namens asyncIterator hinzugefügt haben. Sie können mit myIterator[Symbol.asyncIterator]() auf die Funktion zugreifen. ↩

Freigabeerklärung Dieser Artikel ist abgedruckt unter: https://dev.to/ianmacartney/streaming-http-responses-using-fetch-1fm2?1 Bei Verstößen wenden Sie sich bitte an [email protected], um ihn zu löschen
Neuestes Tutorial Mehr>

Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.

Copyright© 2022 湘ICP备2022001581号-3