이 게시물에서는 HTTP 호출을 가져오고 스트리밍 응답을 청크로 수신할 수 있는 JavaScript Streams API 작업을 살펴보겠습니다. 이를 통해 클라이언트는 서버 응답에 더 많이 응답할 수 있습니다. 신속하게 ChatGPT와 같은 UI를 구축하세요.
동기 부여적인 예로, npm 종속성을 사용하지 않고 내장된 가져오기만 사용하여 OpenAI(또는 동일한 http 스트리밍 API를 사용하는 모든 서버)의 스트리밍 LLM 응답을 처리하는 기능을 구현하겠습니다. 여기에는 지수 백오프를 사용한 재시도, 임베딩, 비스트리밍 채팅, 채팅 완료 및 임베딩과 상호작용하기 위한 더 간단한 API가 포함된 전체 코드가 있습니다.
HTTP 스트림을 클라이언트에 반환하는 방법에 관심이 있다면 이 게시물을 확인하세요.
전체 예시는 다음과 같습니다. 아래에서 각 부분을 살펴보겠습니다.
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i 재시도 및 기타 개선 사항과 함께 스트리밍 및 비스트리밍 매개변수 변형에 대한 멋진 형식의 오버로드가 있는 버전은 여기에서 코드를 참조하세요.
이 게시물의 나머지 부분은 이 코드의 기능을 이해하는 것입니다.
요청하기
이 부분은 사실 매우 쉽습니다. 스트리밍 HTTP 응답은 일반 HTTP 요청에서 나옵니다.
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " process.env.LLM_API_KEY, }, body: JSON.stringify(body), });HTTP 헤더는 평소대로 전송되며 스트리밍을 활성화하기 위해 특별히 아무것도 설정할 필요가 없습니다. 또한 HTTP 스트리밍에 일반 캐싱 헤더를 계속 활용할 수 있습니다.
오류 처리
클라이언트 측의 오류에 관한 이야기는 HTTP 스트리밍의 경우 조금 안타깝습니다. 장점은 HTTP 스트리밍의 경우 클라이언트가 초기 응답에서 즉시 상태 코드를 받고 거기에서 오류를 감지할 수 있다는 것입니다. http 프로토콜의 단점은 서버가 성공을 반환했지만 스트림 중간에 중단된 경우 클라이언트에게 스트림이 중단되었음을 알려주는 프로토콜 수준의 어떤 것도 없다는 것입니다. OpenAI가 이 문제를 해결하기 위해 마지막에 "모두 완료" 센티넬을 인코딩하는 방법을 아래에서 살펴보겠습니다.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }스트림 읽기
HTTP 스트리밍 응답을 읽기 위해 클라이언트는 .getReader() 메소드를 사용하여 서버에서 들어오는 청크를 반복할 수 있는 ReadableStream인 response.body 속성을 사용할 수 있습니다. 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }이것은 우리가 반환하는 모든 데이터 비트를 처리하지만 OpenAI HTTP 프로토콜의 경우 데이터가 줄 바꿈으로 구분된 JSON일 것으로 예상하므로 대신 응답 본문을 분할하고 각 줄을 '생성'합니다. 다시 완료되었습니다. 진행 중인 줄을 lastFragment로 버퍼링하고 두 개의 줄 바꿈으로 구분된 전체 줄만 반환합니다:
// stream here is request.body async function* splitStream(stream: ReadableStream) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment = data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i 이 함수*와 Yield 구문이 익숙하지 않은 경우 function*을 루프에서 여러 항목을 반환할 수 있는 함수로 취급하고 Yield를 함수에서 여러 번 반환하는 방법으로 취급하세요.
그런 다음 다음과 같이 이 SplitStream 함수를 반복할 수 있습니다.
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }이 "for wait" 구문이 문제가 된다면 이는 "비동기 반복자"를 사용하는 것입니다. 이는 for 루프와 함께 사용하는 일반 반복자와 같지만 다음 값을 얻을 때마다 대기됩니다.
예를 들어 OpenAI에서 일부 텍스트를 얻었고 더 많은 것을 기다리고 있는 경우 for 루프는 SplitStream이 다른 값을 생성할 때까지 기다립니다. 이는 wait reader.read()가 완료되는 값을 반환할 때 발생합니다. 하나 이상의 텍스트 줄.
다음으로 호출자가 "for Wait" 루프를 사용하여 이 데이터를 반복할 수 있도록 분할Stream과 같은 함수가 아닌 비동기 반복자를 반환하는 또 다른 방법을 살펴보겠습니다.
비동기 반복자 반환
이제 전체 텍스트 줄을 반환하는 비동기 반복자가 있으므로, 그냥 SplitStream(response.body)을 반환할 수 있지만, 함수 호출자가 계속 반복하도록 하면서 각 줄을 가로채서 변환하고 싶습니다. .
이 접근 방식은 위의 async function* 구문과 유사합니다. 여기서는 호출 시 반환하는 비동기 함수 대신 비동기 반복자를 직접 반환합니다. 차이점은 먼저 호출해야 하는 AsyncGenerator 대신 AsyncIterator 유형이라는 것입니다. AsyncIterator는 다음과 같은 특정 이름의 함수를 사용하여 정의할 수 있습니다: Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };splitStream에서 오는 데이터와 다른 것을 반환하려는 경우에 유용합니다. 스트리밍 HTTP 요청에서 새 라인이 들어올 때마다, SplitStream은 이를 생성하고, 이 함수는 이를 데이터로 수신하고 호출자에게 전달하기 전에 작업을 수행할 수 있습니다.
다음으로 OpenAI의 스트리밍 채팅 완료 API의 경우 이 데이터를 구체적으로 해석하는 방법을 살펴보겠습니다.
OpenAI HTTP 스트리밍 프로토콜 처리
OpenAI 응답 프로토콜은 data: 또는 event:로 시작하는 일련의 줄이지만 채팅 완료에 유용한 부분이므로 데이터 응답만 처리하겠습니다. 스트림이 완료되면 [DONE]이라는 센티널이 있고, 그렇지 않으면 JSON일 뿐입니다.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }모든 것을 하나로 모으기
이제 HTTP 스트리밍을 이해했으므로 SDK나 라이브러리에 의존하지 않고 스트리밍 API로 직접 작업하는 것에 자신감을 가질 수 있습니다. 이를 통해 여러 요청으로 더 많은 대역폭을 소비하지 않고도 UI가 즉시 업데이트를 시작할 수 있으므로 대기 시간을 숨길 수 있습니다. 공식 openai npm 패키지와 마찬가지로 위 기능을 사용할 수 있습니다:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }모델을 사전 구성하고 .choices[0].delta.content:
를 추출하여 이를 더욱 쉽게 만들 수 있는 일부 유틸리티 기능을 만들 수 있는 코드를 여기에서 확인하세요.const response = await chatStream(messages); for await (const content of response) { console.log(content); }코드를 복사하기 전에 비동기 함수 연습으로 직접 구현해 보세요.
더 많은 리소스
- 자체 서버 엔드포인트에서 HTTP 스트리밍 데이터를 반환하는 방법에 대한 자세한 내용은 OpenAI(또는 이와 유사한 것)에서 서버로 데이터를 스트리밍하고 동시에 클라이언트로 스트리밍하는 HTTP 스트리밍을 사용한 AI Chat의 이 게시물을 확인하세요. 사용자 정의 로직이 진행됩니다(예: 데이터베이스에 청크 저장).
- MDN 문서는 언제나 그렇듯 훌륭합니다. 위의 링크 외에도 읽기 가능한 스트림을 태그에 연결하여 이미지 요청을 스트리밍하는 방법을 보여주는 읽기 가능한 스트림 API에 대한 가이드가 있습니다. 참고: 이 가이드에서는 response.body를 비동기 반복자로 사용하지만 현재는 널리 구현되지 않았으며 TypeScript 유형에도 없습니다.
참고: 한 번에 하나의 스트림 리더만 가질 수 있으므로 일반적으로 .getReader()를 여러 번 호출하지 않습니다. 이 경우 아마도 .tee()를 원할 것이고 . 어떤 이유로든 getReader()를 여러 번 수행하는 경우 첫 번째 .releaseLock()이 먼저 있는지 확인하세요. ↩
또는 기호에 익숙하지 않은 경우 문자열이나 숫자가 아닌 객체에 키를 갖는 방식으로 사용됩니다. 이렇게 하면 asyncIterator라는 키를 추가해도 충돌하지 않습니다. myIterator[Symbol.asyncIterator]()를 사용하여 함수에 액세스할 수 있습니다. ↩
부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.
Copyright© 2022 湘ICP备2022001581号-3