async direct io per tensor test

This commit is contained in:
slaren 2024-05-22 01:08:52 +02:00
parent 46db3506aa
commit e9095e6098

152
llama.cpp
View File

@ -1583,91 +1583,6 @@ protected:
llama_mmap() {}
};
struct llama_anonymous_mmap : llama_mmap {
llama_file * file;
llama_anonymous_mmap(const llama_anonymous_mmap &) = delete;
#ifdef _POSIX_MAPPED_FILES
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
llama_anonymous_mmap(struct llama_file * file) {
this->file = file;
size = file->size;
addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr == MAP_FAILED) { // NOLINT
throw std::runtime_error(format("mmap(.., MAP_ANONYMOUS) failed: %s", strerror(errno)));
}
#ifdef __linux__
// THP is enabled by default for anonymous memory mappings on madvise
if (madvise(addr, size, MADV_HUGEPAGE)) {
LLAMA_LOG_WARN("warning: madvise(.., MADV_HUGEPAGE) failed: %s\n", strerror(errno));
}
#endif
mapped_fragments.emplace_back(0, size);
}
void populate(size_t first, size_t last) const override {
int page_size = sysconf(_SC_PAGESIZE);
align_to_previous_page(&first, page_size);
align_to_next_page(&last, page_size);
size_t bytes_read = file->read_direct((char *) addr + first, last - first, first);
if (bytes_read != std::min(last, file->size) - first) {
throw std::runtime_error("unexpectedly reached end of file");
}
}
#elif defined(_WIN32)
llama_anonymous_mmap(struct llama_file * file) {
this->file = file;
size = file->size;
HANDLE hMapping = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, size >> 32, size, NULL);
if (hMapping == NULL) {
throw std::runtime_error(format("CreateFileMapping failed: %s", llama_format_win_err(GetLastError()).c_str()));
}
addr = MapViewOfFile(hMapping, FILE_MAP_ALL_ACCESS, 0, 0, size);
DWORD dwError = GetLastError();
CloseHandle(hMapping);
if (addr == NULL) {
throw std::runtime_error(format("MapViewOfFile failed: %s", llama_format_win_err(dwError).c_str()));
}
}
void populate(size_t first, size_t last) const override {
SYSTEM_INFO siSysInfo;
GetSystemInfo(&siSysInfo);
DWORD dwPageSize = siSysInfo.dwPageSize;
align_to_previous_page(&first, dwPageSize);
align_to_next_page(&last, dwPageSize);
size_t bytes_read = file->read_direct((char *) addr + first, last - first, first);
if (bytes_read != std::min(last, file->size) - first) {
throw std::runtime_error("unexpectedly reached end of file");
}
}
#else
llama_anonymous_mmap(struct llama_file * file) {
GGML_UNUSED(file);
throw std::runtime_error("mmap not supported");
}
void populate(size_t first, size_t last) const override {
GGML_UNUSED(first);
GGML_UNUSED(last);
throw std::runtime_error("mmap not supported");
}
#endif
};
using llama_mmaps = std::vector<std::unique_ptr<llama_mmap>>;
// Represents some region of memory being locked using mlock or VirtualLock;
@ -3470,13 +3385,16 @@ struct llama_model_loader {
}
// either file or anonymous mappings
this->use_mmap = use_mmap || use_direct_io;
this->use_mmap = use_mmap && !use_direct_io;
this->use_direct_io = use_direct_io;
this->check_tensors = check_tensors;
}
~llama_model_loader() {
for (auto & task : load_tasks) {
task.wait();
}
if (meta) {
gguf_free(meta);
}
@ -3668,12 +3586,12 @@ struct llama_model_loader {
}
}
void init_mappings(bool prefetch = true, llama_mlocks * mlock_mmaps = nullptr, bool anonymous = false) {
void init_mappings(bool prefetch = true, llama_mlocks * mlock_mmaps = nullptr) {
if (use_mmap) {
mappings.reserve(files.size());
mmaps_used.reserve(files.size());
for (const auto & file : files) {
std::unique_ptr<llama_mmap> mapping(anonymous ? new llama_anonymous_mmap(file.get()) : new llama_mmap(file.get(), prefetch ? -1 : 0, ggml_is_numa()));
std::unique_ptr<llama_mmap> mapping(new llama_mmap(file.get(), prefetch ? -1 : 0, ggml_is_numa()));
mmaps_used.emplace_back(mapping->size, 0);
if (mlock_mmaps) {
std::unique_ptr<llama_mlock> mlock_mmap(new llama_mlock());
@ -3741,6 +3659,26 @@ struct llama_model_loader {
size_t size_done = 0;
size_t size_data = 0;
std::vector<std::pair<size_t, size_t>> mmaps_used;
std::vector<std::future<void>> load_tasks;
void add_load_task(llama_file * file, const llama_tensor_weight * weight, ggml_tensor * cur) {
load_tasks.emplace_back(std::async(std::launch::async, [file, cur, weight] {
size_t page_size = sysconf(_SC_PAGESIZE);
size_t aligned_offset = weight->offs & ~(page_size - 1);
size_t diff = weight->offs - aligned_offset;
size_t read_size = GGML_PAD(ggml_nbytes(cur) + diff, page_size);
std::vector<no_init<uint8_t>> read_buf(read_size + page_size);
uint8_t * read_buf_dio;
size_t page_offset = (uintptr_t)read_buf.data() % page_size;
if (page_offset > 0) {
read_buf_dio = (uint8_t*)read_buf.data() + page_size - page_offset;
} else {
read_buf_dio = (uint8_t*)read_buf.data();
}
file->read_direct(read_buf_dio, read_size , aligned_offset);
ggml_backend_tensor_set(cur, read_buf_dio + diff, 0, ggml_nbytes(cur));
}));
}
// Returns false if cancelled by progress_callback
bool load_all_data(
@ -3751,15 +3689,6 @@ struct llama_model_loader {
void * progress_callback_user_data) {
GGML_ASSERT(size_data != 0 && "call init_mappings() first");
if (use_mmap) {
for (uint32_t idx = 0; idx < files.size(); idx++) {
void * addr = nullptr;
size_t first, last;
get_mapping_range(&first, &last, &addr, idx, ctx);
mappings.at(idx)->populate(first, last);
}
}
std::vector<no_init<uint8_t>> read_buf;
std::vector<std::future<std::pair<ggml_tensor *, bool>>> validation_result;
@ -3811,7 +3740,11 @@ struct llama_model_loader {
const auto & file = files.at(weight->idx);
if (ggml_backend_buffer_is_host(cur->buffer)) {
file->seek(weight->offs, SEEK_SET);
file->read_raw(cur->data, n_size);
if (use_direct_io) {
add_load_task(file.get(), weight, cur);
} else {
file->read_raw(cur->data, n_size);
}
if (check_tensors) {
validation_result.emplace_back(std::async(std::launch::async, [cur, n_size] {
return std::make_pair(cur, ggml_validate_row_data(cur->type, cur->data, n_size));
@ -3820,14 +3753,31 @@ struct llama_model_loader {
} else {
read_buf.resize(n_size);
file->seek(weight->offs, SEEK_SET);
file->read_raw(read_buf.data(), n_size);
ggml_backend_tensor_set(cur, read_buf.data(), 0, n_size);
if (use_direct_io) {
add_load_task(file.get(), weight, cur);
} else {
file->read_raw(read_buf.data(), n_size);
ggml_backend_tensor_set(cur, read_buf.data(), 0, n_size);
}
if (check_tensors && !ggml_validate_row_data(cur->type, read_buf.data(), n_size)) {
throw std::runtime_error(format("tensor '%s' has invalid data", ggml_get_name(cur)));
}
}
}
const int max_load_tasks = 8;
while (load_tasks.size() >= max_load_tasks) {
size_t n_size = load_tasks.size();
for (size_t i = 0; i < n_size; i++) {
auto & future = load_tasks.at(i);
if (future.wait_for(std::chrono::milliseconds(10)) == std::future_status::ready) {
future.get();
load_tasks.erase(load_tasks.begin() + i);
break;
}
}
}
size_done += n_size;
}
@ -6219,7 +6169,7 @@ static bool llm_load_tensors(
ml.done_getting_tensors();
ml.init_mappings(true, use_mlock ? &model.mlock_mmaps : nullptr, /* anonymous */ ml.use_direct_io);
ml.init_mappings(true, use_mlock ? &model.mlock_mmaps : nullptr);
model.mappings.reserve(ml.mappings.size());
// create the backend buffers