سينظر هذا المنشور في العمل مع JavaScript Streams API الذي يسمح بإجراء استدعاء HTTP وتلقي استجابة متدفقة في أجزاء، مما يسمح للعميل ببدء الاستجابة لاستجابة الخادم أكثر بسرعة وإنشاء واجهات مستخدم مثل ChatGPT.
كمثال تحفيزي، سنقوم بتنفيذ وظيفة للتعامل مع استجابة LLM المتدفقة من OpenAI (أو أي خادم يستخدم نفس واجهة برمجة تطبيقات تدفق http)، دون استخدام تبعيات npm - فقط الجلب المدمج. الكود الكامل موجود هنا بما في ذلك إعادة المحاولة مع التراجع الأسي، والتضمين، والدردشة غير المتدفقة، وواجهات برمجة التطبيقات الأكثر بساطة للتفاعل مع عمليات إكمال الدردشة والتضمين.
إذا كنت مهتمًا بمعرفة كيفية إرجاع تدفق HTTP أيضًا إلى العملاء، فاطلع على هذا المنشور.
إليك المثال الكامل. سننظر إلى كل قطعة أدناه:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i راجع الكود هنا للحصول على إصدار يحتوي على أحمال زائدة مكتوبة بشكل جيد لمتغيرات معلمات البث وغير الدفق، إلى جانب عمليات إعادة المحاولة والتحسينات الأخرى.
تدور بقية المقالة حول فهم ما يفعله هذا الرمز.
تقديم الطلب
هذا الجزء في الواقع سهل للغاية. تأتي استجابة HTTP المتدفقة من طلب HTTP عادي:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });يتم إرسال رؤوس HTTP كالمعتاد، ولا يتعين عليك تعيين أي شيء على وجه الخصوص لتمكين البث. ولا يزال بإمكانك الاستفادة من رؤوس التخزين المؤقت العادية لتدفق HTTP.
التعامل مع الأخطاء
تعتبر القصة المتعلقة بالأخطاء من جانب العميل مؤسفة بعض الشيء بالنسبة لتدفق HTTP. الجانب الإيجابي هو أنه بالنسبة لتدفق HTTP، يحصل العميل على رموز الحالة على الفور في الاستجابة الأولية ويمكنه اكتشاف الفشل هناك. الجانب السلبي لبروتوكول http هو أنه إذا عاد الخادم بنجاح ثم انقطع في منتصف الدفق، فلا يوجد أي شيء على مستوى البروتوكول يخبر العميل بأن الدفق قد تمت مقاطعته. سنرى أدناه كيف يقوم OpenAI بتشفير حارس "تم إنجاز كل شيء" في النهاية للتغلب على هذه المشكلة.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }قراءة الدفق
من أجل قراءة استجابة تدفق HTTP، يمكن للعميل استخدام خاصية Response.body وهي عبارة عن ReadableStream مما يسمح لك بالتكرار على المقاطع عند وصولها من الخادم باستخدام طريقة .getReader(). 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }يعالج هذا كل جزء من البيانات التي نستعيدها، ولكن بالنسبة لبروتوكول OpenAI HTTP، نتوقع أن تكون البيانات مفصولة بتنسيق JSON بأسطر جديدة، لذلك بدلاً من ذلك سنقوم بتقسيم نص الاستجابة و"إنتاج" كل سطر كما هو. إعادة الانتهاء. نقوم بتخزين السطر قيد التقدم في lastFragment ونعيد فقط الأسطر الكاملة التي تم فصلها بسطرين جديدين:
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i إذا كانت هذه الوظيفة* وصياغة العائد غير مألوفة لك، فما عليك سوى التعامل مع الوظيفة* كدالة يمكنها إرجاع أشياء متعددة في حلقة، وإنتاجها كطريقة لإرجاع شيء ما عدة مرات من الوظيفة.
يمكنك بعد ذلك تكرار وظيفة SplitStream هذه مثل:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }إذا أزعجتك صياغة "for Wait"، فإنها تستخدم ما يسمى "مكرر غير متزامن" - مثل مكرر عادي تستخدمه مع حلقة for، ولكن في كل مرة تحصل على القيمة التالية، يتم انتظارها.
على سبيل المثال، عندما نحصل على بعض النصوص من OpenAI وننتظر المزيد، ستنتظر حلقة for حتى ينتج SplitStream قيمة أخرى، وهو ما سيحدث عندما يُرجع Wait Reader.read() قيمة تنتهي سطر واحد أو أكثر من النص.
بعد ذلك، سننظر في طريقة أخرى لإرجاع مكرر غير متزامن لا يمثل وظيفة مثل SplitStream، بحيث يمكن للمتصل استخدام حلقة "للانتظار" للتكرار على هذه البيانات.
إرجاع مكرر غير متزامن
الآن بعد أن أصبح لدينا مكرر غير متزامن يعيد أسطرًا كاملة من النص، يمكننا فقط إرجاع SplitStream(response.body)، لكننا نريد اعتراض كل سطر وتحويله، مع الاستمرار في السماح لمستدعي وظيفتنا بالتكرار .
يشبه هذا النهج بناء جملة الدالة غير المتزامنة* أعلاه. سنعيد هنا مكررًا غير متزامن مباشرةً، بدلاً من دالة غير متزامنة تُرجع واحدًا عند استدعائها. الفرق هو أن النوع هو AsyncIterator بدلاً من AsyncGenerator الذي يجب استدعاؤه أولاً. يمكن تعريف AsyncIterator من خلال وجود وظيفة مسماة معينة: الرمز.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };يعد هذا مفيدًا عندما تريد إرجاع شيء مختلف عن البيانات الواردة من SplitStream. في كل مرة يأتي سطر جديد من طلب HTTP المتدفق، سينتجه SplitStream، وستستقبله هذه الوظيفة في البيانات ويمكنها القيام بشيء ما قبل تسليمه إلى المتصل الخاص بها.
سننظر بعد ذلك في كيفية تفسير هذه البيانات على وجه التحديد في حالة واجهة برمجة تطبيقات إكمال الدردشة المتدفقة من OpenAI.
التعامل مع بروتوكول تدفق OpenAI HTTP
بروتوكول استجابة OpenAI عبارة عن سلسلة من الأسطر التي تبدأ بالبيانات: أو الحدث:، ولكننا سنتعامل فقط مع استجابات البيانات، نظرًا لأن هذا هو الجزء المفيد لإكمال الدردشة. هناك حارس [تم] إذا انتهى البث، وإلا فسيكون JSON فقط.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }جمع كل ذلك معا
الآن بعد أن فهمت تدفق HTTP، يمكنك أن تشعر بالثقة في العمل مباشرة مع واجهات برمجة التطبيقات المتدفقة دون الاعتماد على أدوات تطوير البرمجيات (sdks) أو المكتبات. يتيح لك هذا إخفاء زمن الاستجابة، حيث يمكن لواجهة المستخدم الخاصة بك أن تبدأ في التحديث على الفور، دون استهلاك المزيد من النطاق الترددي مع طلبات متعددة. يمكنك استخدام الوظيفة المذكورة أعلاه كما تفعل مع حزمة openai npm الرسمية:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }راجع الكود هنا الذي يتيح لك أيضًا إجراء بعض الوظائف المساعدة لتسهيل ذلك عن طريق تكوين النموذج مسبقًا واستخراج .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }قبل نسخ الكود، حاول تنفيذه بنفسك كتمرين على وظائف غير متزامنة.
المزيد من الموارد
- للحصول على معلومات حول إرجاع بيانات تدفق HTTP من نقطة نهاية الخادم الخاص بك، راجع هذا المنشور على AI Chat with HTTP Streaming الذي يقوم بتدفق البيانات من OpenAI (أو ما شابه) إلى الخادم الخاص بك ويقوم في نفس الوقت بتدفقها إلى العميل، أثناء القيام بذلك منطق مخصص كما هو الحال (مثل حفظ الأجزاء في قاعدة بيانات).
- مستندات MDN، كما هو الحال دائمًا، رائعة. بالإضافة إلى الروابط أعلاه، إليك دليل حول واجهة برمجة التطبيقات للتدفقات القابلة للقراءة والتي توضح كيفية توصيل تدفق قابل للقراءة بعلامة للتدفق في طلب صورة. ملاحظة: يستخدم هذا الدليل Response.body كمكرر غير متزامن، ولكن حاليًا لا يتم تنفيذ ذلك على نطاق واسع وليس في أنواع TypeScript.
ملاحظة: لا يمكن أن يكون لديك سوى قارئ واحد للتدفق في المرة الواحدة، لذلك لا تتصل بشكل عام بـ .getReader() عدة مرات - فمن المحتمل أنك تريد .tee() في هذه الحالة، وإذا كنت تريد استخدام . getReader() عدة مرات لسبب ما، تأكد من حصولك على أول .releaseLock() أولاً. ↩
أو بدلاً من ذلك، إذا لم تكن على دراية بالرمز، فسيتم استخدامه بطريقة للحصول على مفاتيح في كائن ليس عبارة عن سلاسل أو أرقام. وبهذه الطريقة لا يتعارضان إذا أضفت مفتاحًا باسم asyncIterator. يمكنك الوصول إلى الوظيفة باستخدام myIterator[Symbol.asyncIterator](). ↩
تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.
Copyright© 2022 湘ICP备2022001581号-3