В этом посте будет рассмотрена работа с API JavaScript Streams, который позволяет выполнять HTTP-вызов выборки и получать потоковый ответ частями, что позволяет клиенту начать отвечать на ответ сервера. быстро и создавайте пользовательские интерфейсы, такие как ChatGPT.
В качестве мотивирующего примера мы реализуем функцию для обработки потокового ответа LLM от OpenAI (или любого сервера, использующего тот же API потоковой передачи HTTP), не используя никаких зависимостей npm — только встроенную выборку. Здесь представлен полный код, включая повторы с экспоненциальной задержкой, встраивания, непотоковое чат и более простые API для взаимодействия с завершением и встраиванием чата.
Если вам интересно узнать, как также вернуть HTTP-поток клиентам, прочтите этот пост.
Вот полный пример. Мы рассмотрим каждую часть ниже:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Здесь приведен код версии, которая имеет удобные типизированные перегрузки для вариантов параметров потоковой и непотоковой передачи, а также повторные попытки и другие улучшения.
Остальная часть поста посвящена пониманию того, что делает этот код.
Оформление запроса
На самом деле эта часть очень проста. Потоковый HTTP-ответ поступает из обычного HTTP-запроса:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });Заголовки HTTP отправляются как обычно, и для включения потоковой передачи не требуется ничего устанавливать. И вы по-прежнему можете использовать обычные заголовки кэширования для потоковой передачи HTTP.
Обработка ошибок
История с ошибками на стороне клиента немного неудачна для потоковой передачи HTTP. Положительным моментом является то, что для потоковой передачи HTTP клиент сразу получает коды состояния в первоначальном ответе и может обнаружить там сбой. Недостатком протокола http является то, что если сервер возвращает успех, но затем прерывается в середине потока, на уровне протокола нет ничего, что сообщало бы клиенту, что поток был прерван. Ниже мы увидим, как OpenAI кодирует сигнал «все готово» в конце, чтобы обойти эту проблему.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }Чтение потока
Чтобы прочитать ответ потоковой передачи HTTP, клиент может использовать свойство response.body, которое представляет собой ReadableStream, позволяющее перебирать фрагменты по мере их поступления с сервера с помощью метода .getReader(). 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }Это обрабатывает каждый бит данных, которые мы получаем обратно, но для протокола OpenAI HTTP мы ожидаем, что данные будут в формате JSON, разделенные символами новой строки, поэтому вместо этого мы разделим тело ответа и «выдадим» каждую строку по мере их получения. повторно завершено. Мы буферизуем выполняющуюся строку в LastFragment и возвращаем только полные строки, разделенные двумя символами новой строки:
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i Если эта функция* и синтаксис доходности вам незнакомы, просто рассматривайте функцию* как функцию, которая может возвращать несколько элементов в цикле, а выход — как способ многократного возврата чего-либо из функции.
Затем вы можете перебрать эту функцию SplitStream, например:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }Если синтаксис for await вас сбивает с толку, он использует так называемый «асинхронный итератор» — он похож на обычный итератор, который вы бы использовали с циклом for, но каждый раз, когда он получает следующее значение, он ожидается.
Для нашего примера, когда мы получили некоторый текст от OpenAI и ждем продолжения, цикл for будет ждать, пока SplitStream не вернет другое значение, что произойдет, когда await reader.read() вернет значение, которое завершается. одна или несколько строк текста.
Далее мы рассмотрим другой способ возврата асинхронного итератора, который не является такой функцией, как SplitStream, чтобы вызывающая сторона могла использовать цикл «for await» для перебора этих данных.
Возврат асинхронного итератора
Теперь, когда у нас есть асинхронный итератор, возвращающий полные строки текста, мы могли бы просто вернуть SplitStream(response.body), но мы хотим перехватить каждую из строк и преобразовать их, в то же время позволяя вызывающему объекту нашей функции выполнять итерацию. .
Подход аналогичен синтаксису асинхронной функции*, приведенному выше. Здесь мы вернем асинхронный итератор напрямую, а не асинхронную функцию, которая возвращает его при вызове. Разница в том, что типом является AsyncIterator вместо AsyncGenerator, который необходимо вызвать первым. AsyncIterator можно определить с помощью определенной именованной функции: Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };Это полезно, если вы хотите вернуть что-то отличное от данных, поступающих из SplitStream. Каждый раз, когда из потокового HTTP-запроса поступает новая строка, функция SplitStream возвращает ее, эта функция получает ее в виде данных и может что-то сделать, прежде чем передать ее вызывающей стороне.
Далее мы рассмотрим, как интерпретировать эти данные конкретно в случае API завершения потокового чата OpenAI.
Обработка протокола потоковой передачи HTTP OpenAI
Протокол ответа OpenAI представляет собой серию строк, которые начинаются с data: или event:, но мы будем обрабатывать только ответы с данными, поскольку это полезная часть для завершения чата. Если поток завершен, отображается индикатор [DONE], в противном случае это просто JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }Собираем все это вместе
Теперь, когда вы понимаете потоковую передачу HTTP, вы можете чувствовать себя уверенно, работая напрямую с API потоковой передачи, не полагаясь на SDK или библиотеки. Это позволяет вам скрыть задержку, поскольку ваш пользовательский интерфейс может немедленно начать обновление, не потребляя больше трафика при выполнении нескольких запросов. Вы можете использовать вышеуказанную функцию так же, как и в официальном пакете openai npm:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }См. код здесь, который также позволяет вам создать некоторые служебные функции, чтобы сделать это еще проще, предварительно настроив модель и извлекая .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }Прежде чем копировать код, попробуйте реализовать его самостоятельно в качестве упражнения по асинхронным функциям.
Больше ресурсов
- Для получения информации о возврате потоковых данных HTTP с конечной точки вашего собственного сервера прочтите этот пост в AI Chat с HTTP Streaming, который одновременно передает данные из OpenAI (или аналогичного) на ваш сервер и одновременно передает их клиенту, одновременно выполняя пользовательская логика по мере ее реализации (например, сохранение фрагментов в базе данных).
- Документация MDN, как всегда, великолепна. Помимо приведенных выше ссылок, вот руководство по API читаемых потоков, которое показывает, как подключить читаемый поток к тегу для потоковой передачи в запросе изображения. Примечание. В этом руководстве в качестве асинхронного итератора используется response.body, но в настоящее время он широко не реализован и не используется в типах TypeScript.
Примечание: одновременно у вас может быть только один читатель потока, поэтому обычно вы не вызываете .getReader() несколько раз - в этом случае вам, вероятно, понадобится .tee(), и если вы хотите использовать . getReader() несколько раз по какой-то причине, сначала убедитесь, что у вас есть первый .releaseLock(). ↩
Или, альтернативно, вы можете. Если вы не знакомы с символом, он используется для того, чтобы ключи в объекте не были строками или числами. Таким образом, они не будут конфликтовать, если вы добавите ключ с именем asyncIterator. Вы можете получить доступ к функции с помощью myIterator[Symbol.asyncIterator](). ↩
Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.
Copyright© 2022 湘ICP备2022001581号-3