server : fix cleaning up stream task (#11418)

* server : fix cleaning up stream task

* one more spot
This commit is contained in:
Xuan Son Nguyen 2025-01-25 16:36:44 +01:00 committed by GitHub
parent 20a758155b
commit 49b0e3cec4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1427,16 +1427,16 @@ struct server_queue {
int post(server_task task, bool front = false) { int post(server_task task, bool front = false) {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
GGML_ASSERT(task.id != -1); GGML_ASSERT(task.id != -1);
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
QUE_DBG("new task, id = %d, front = %d\n", task.id, front); QUE_DBG("new task, id = %d, front = %d\n", task.id, front);
if (front) { if (front) {
queue_tasks.push_front(std::move(task)); queue_tasks.push_front(std::move(task));
} else { } else {
queue_tasks.push_back(std::move(task)); queue_tasks.push_back(std::move(task));
} }
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
condition_tasks.notify_one(); condition_tasks.notify_one();
return task.id; return task.id;
} }
@ -1448,16 +1448,16 @@ struct server_queue {
if (task.id == -1) { if (task.id == -1) {
task.id = id++; task.id = id++;
} }
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
QUE_DBG("new task, id = %d/%d, front = %d\n", task.id, (int) tasks.size(), front); QUE_DBG("new task, id = %d/%d, front = %d\n", task.id, (int) tasks.size(), front);
if (front) { if (front) {
queue_tasks.push_front(std::move(task)); queue_tasks.push_front(std::move(task));
} else { } else {
queue_tasks.push_back(std::move(task)); queue_tasks.push_back(std::move(task));
} }
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
} }
condition_tasks.notify_one(); condition_tasks.notify_one();
return 0; return 0;
@ -1554,10 +1554,10 @@ struct server_queue {
} }
private: private:
void cleanup_pending_task(int id_task) { void cleanup_pending_task(int id_target) {
// no need lock because this is called exclusively by post() // no need lock because this is called exclusively by post()
auto rm_func = [id_task](const server_task & task) { auto rm_func = [id_target](const server_task & task) {
return task.id_target == id_task; return task.id_target == id_target;
}; };
queue_tasks.erase( queue_tasks.erase(
std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func), std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func),