From 5f631c26794b6371fcf2660e8d0c53494a5575f7 Mon Sep 17 00:00:00 2001 From: Stephen Nichols Date: Fri, 4 Aug 2023 06:37:24 -0500 Subject: [PATCH] Fixing race condition in server and partial stream handling in frontend. (#2391) * Fixing race condition in server.cpp and partial stream handling in completion.js * Reverting assert edits. * Adding newline to eof --- examples/server/public/completion.js | 53 +++++++++++++++++++--------- examples/server/server.cpp | 6 +++- 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/examples/server/public/completion.js b/examples/server/public/completion.js index a43d5a7d5..0c9bd5f10 100644 --- a/examples/server/public/completion.js +++ b/examples/server/public/completion.js @@ -43,6 +43,7 @@ export async function* llama(prompt, params = {}, config = {}) { const decoder = new TextDecoder(); let content = ""; + let leftover = ""; // Buffer for partially read lines try { let cont = true; @@ -53,29 +54,47 @@ export async function* llama(prompt, params = {}, config = {}) { break; } - // sse answers in the form multiple lines of: value\n with data always present as a key. in our case we - // mainly care about the data: key here, which we expect as json - const text = decoder.decode(result.value); + // Add any leftover data to the current chunk of data + const text = leftover + decoder.decode(result.value); - // parse all sse events and add them to result - const regex = /^(\S+):\s(.*)$/gm; - for (const match of text.matchAll(regex)) { - result[match[1]] = match[2] + // Check if the last character is a line break + const endsWithLineBreak = text.endsWith('\n'); + + // Split the text into lines + let lines = text.split('\n'); + + // If the text doesn't end with a line break, then the last line is incomplete + // Store it in leftover to be added to the next chunk of data + if (!endsWithLineBreak) { + leftover = lines.pop(); + } else { + leftover = ""; // Reset leftover if we have a line break at the end } - // since we know this is llama.cpp, let's just decode the json in data - result.data = JSON.parse(result.data); - content += result.data.content; + // Parse all sse events and add them to result + const regex = /^(\S+):\s(.*)$/gm; + for (const line of lines) { + const match = regex.exec(line); + if (match) { + result[match[1]] = match[2] + // since we know this is llama.cpp, let's just decode the json in data + if (result.data) { + result.data = JSON.parse(result.data); + content += result.data.content; - // yield - yield result; + // yield + yield result; - // if we got a stop token from server, we will break here - if (result.data.stop) { - if (result.data.generation_settings) { - generation_settings = result.data.generation_settings; + // if we got a stop token from server, we will break here + if (result.data.stop) { + if (result.data.generation_settings) { + generation_settings = result.data.generation_settings; + } + cont = false; + break; + } + } } - break; } } } catch (e) { diff --git a/examples/server/server.cpp b/examples/server/server.cpp index c0725088f..6f7a66da1 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1274,7 +1274,11 @@ int main(int argc, char **argv) sink.done(); return true; }; - res.set_chunked_content_provider("text/event-stream", chunked_content_provider); + const auto on_complete = [&](bool) { + llama.mutex.unlock(); + }; + lock.release(); + res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); } }); svr.Get("/model.json", [&llama](const Request &, Response &res)