From db10f01310beea8a1ef7798651b9d692fd1149d0 Mon Sep 17 00:00:00 2001 From: Radoslav Gerganov Date: Mon, 20 May 2024 16:36:55 +0300 Subject: [PATCH] rpc : track allocated buffers (#7411) * rpc : track allocated buffers ref: #7407 * rpc : pack rpc_tensor tightly --- ggml-rpc.cpp | 229 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 176 insertions(+), 53 deletions(-) diff --git a/ggml-rpc.cpp b/ggml-rpc.cpp index 4a9bfa52d..cc1d3ace1 100644 --- a/ggml-rpc.cpp +++ b/ggml-rpc.cpp @@ -56,6 +56,7 @@ struct socket_t { }; // ggml_tensor is serialized into rpc_tensor +#pragma pack(push, 1) struct rpc_tensor { uint64_t id; uint32_t type; @@ -71,6 +72,7 @@ struct rpc_tensor { uint64_t data; char name[GGML_MAX_NAME]; }; +#pragma pack(pop) // RPC commands enum rpc_cmd { @@ -340,23 +342,6 @@ static rpc_tensor serialize_tensor(const ggml_tensor * tensor) { return result; } -static ggml_tensor * deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor) { - ggml_tensor * result = ggml_new_tensor_4d(ctx, (ggml_type) tensor->type, - tensor->ne[0], tensor->ne[1], tensor->ne[2], tensor->ne[3]); - for (uint32_t i = 0; i < GGML_MAX_DIMS; i++) { - result->nb[i] = tensor->nb[i]; - } - result->buffer = reinterpret_cast(tensor->buffer); - result->op = (ggml_op) tensor->op; - for (uint32_t i = 0; i < GGML_MAX_OP_PARAMS / sizeof(int32_t); i++) { - result->op_params[i] = tensor->op_params[i]; - } - result->flags = tensor->flags; - result->data = reinterpret_cast(tensor->data); - ggml_set_name(result, tensor->name); - return result; -} - GGML_CALL static void ggml_backend_rpc_buffer_init_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor) { UNUSED(buffer); if (ggml_is_quantized(tensor->type)) { @@ -465,13 +450,15 @@ GGML_CALL static ggml_backend_buffer_t ggml_backend_rpc_buffer_type_alloc_buffer memcpy(&remote_ptr, output.data(), sizeof(remote_ptr)); size_t remote_size; memcpy(&remote_size, output.data() + sizeof(uint64_t), sizeof(remote_size)); - - ggml_backend_buffer_t buffer = ggml_backend_buffer_init(buft, - ggml_backend_rpc_buffer_interface, - new ggml_backend_rpc_buffer_context{buft_ctx->sock, {}, remote_ptr, "RPC"}, - remote_size); - - return buffer; + if (remote_ptr != 0) { + ggml_backend_buffer_t buffer = ggml_backend_buffer_init(buft, + ggml_backend_rpc_buffer_interface, + new ggml_backend_rpc_buffer_context{buft_ctx->sock, {}, remote_ptr, "RPC"}, + remote_size); + return buffer; + } else { + return nullptr; + } } static size_t get_alignment(const std::shared_ptr & sock) { @@ -658,7 +645,7 @@ GGML_CALL ggml_backend_t ggml_backend_rpc_init(const char * endpoint) { } } #endif - GGML_PRINT_DEBUG("Connecting to %s\n", endpoint); + fprintf(stderr, "Connecting to %s\n", endpoint); std::string host; int port; if (!parse_endpoint(endpoint, host, port)) { @@ -731,22 +718,61 @@ GGML_API GGML_CALL void ggml_backend_rpc_get_device_memory(const char * endpoint // RPC server-side implementation -static void rpc_alloc_buffer(ggml_backend_t backend, const std::vector & input, std::vector & output) { +class rpc_server { +public: + rpc_server(ggml_backend_t backend) : backend(backend) {} + ~rpc_server(); + + bool alloc_buffer(const std::vector & input, std::vector & output); + void get_alignment(std::vector & output); + void get_max_size(std::vector & output); + bool buffer_get_base(const std::vector & input, std::vector & output); + bool free_buffer(const std::vector & input); + bool buffer_clear(const std::vector & input); + bool set_tensor(const std::vector & input); + bool get_tensor(const std::vector & input, std::vector & output); + bool copy_tensor(const std::vector & input, std::vector & output); + bool graph_compute(const std::vector & input, std::vector & output); + +private: + ggml_tensor * deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor); + ggml_tensor * create_node(uint64_t id, + struct ggml_context * ctx, + const std::unordered_map & tensor_ptrs, + std::unordered_map & tensor_map); + + + ggml_backend_t backend; + std::unordered_set buffers; +}; + +bool rpc_server::alloc_buffer(const std::vector & input, std::vector & output) { // input serialization format: | size (8 bytes) | + if (input.size() != sizeof(uint64_t)) { + return false; + } uint64_t size; memcpy(&size, input.data(), sizeof(size)); ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend); ggml_backend_buffer_t buffer = ggml_backend_buft_alloc_buffer(buft, size); - uint64_t remote_ptr = reinterpret_cast(buffer); - uint64_t remote_size = buffer->size; - GGML_PRINT_DEBUG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, size, remote_ptr, remote_size); + uint64_t remote_ptr = 0; + uint64_t remote_size = 0; + if (buffer != nullptr) { + remote_ptr = reinterpret_cast(buffer); + remote_size = buffer->size; + GGML_PRINT_DEBUG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, size, remote_ptr, remote_size); + buffers.insert(buffer); + } else { + GGML_PRINT_DEBUG("[%s] size: %" PRIu64 " -> failed\n", __func__, size); + } // output serialization format: | remote_ptr (8 bytes) | remote_size (8 bytes) | output.resize(2*sizeof(uint64_t), 0); memcpy(output.data(), &remote_ptr, sizeof(remote_ptr)); memcpy(output.data() + sizeof(uint64_t), &remote_size, sizeof(remote_size)); + return true; } -static void rpc_get_alignment(ggml_backend_t backend, std::vector & output) { +void rpc_server::get_alignment(std::vector & output) { ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend); size_t alignment = ggml_backend_buft_get_alignment(buft); GGML_PRINT_DEBUG("[%s] alignment: %lu\n", __func__, alignment); @@ -755,7 +781,7 @@ static void rpc_get_alignment(ggml_backend_t backend, std::vector & out memcpy(output.data(), &alignment, sizeof(alignment)); } -static void rpc_get_max_size(ggml_backend_t backend, std::vector & output) { +void rpc_server::get_max_size(std::vector & output) { ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend); size_t max_size = ggml_backend_buft_get_max_size(buft); GGML_PRINT_DEBUG("[%s] max_size: %lu\n", __func__, max_size); @@ -764,41 +790,90 @@ static void rpc_get_max_size(ggml_backend_t backend, std::vector & outp memcpy(output.data(), &max_size, sizeof(max_size)); } -static void rpc_buffer_get_base(const std::vector & input, std::vector & output) { +bool rpc_server::buffer_get_base(const std::vector & input, std::vector & output) { // input serialization format: | remote_ptr (8 bytes) | + if (input.size() != sizeof(uint64_t)) { + return false; + } uint64_t remote_ptr; memcpy(&remote_ptr, input.data(), sizeof(remote_ptr)); GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, remote_ptr); ggml_backend_buffer_t buffer = reinterpret_cast(remote_ptr); + if (buffers.find(buffer) == buffers.end()) { + GGML_PRINT_DEBUG("[%s] buffer not found\n", __func__); + return false; + } void * base = ggml_backend_buffer_get_base(buffer); // output serialization format: | base_ptr (8 bytes) | uint64_t base_ptr = reinterpret_cast(base); output.resize(sizeof(uint64_t), 0); memcpy(output.data(), &base_ptr, sizeof(base_ptr)); + return true; } -static void rpc_free_buffer(const std::vector & input) { +bool rpc_server::free_buffer(const std::vector & input) { // input serialization format: | remote_ptr (8 bytes) | + if (input.size() != sizeof(uint64_t)) { + return false; + } uint64_t remote_ptr; memcpy(&remote_ptr, input.data(), sizeof(remote_ptr)); GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, remote_ptr); ggml_backend_buffer_t buffer = reinterpret_cast(remote_ptr); + if (buffers.find(buffer) == buffers.end()) { + GGML_PRINT_DEBUG("[%s] buffer not found\n", __func__); + return false; + } ggml_backend_buffer_free(buffer); + buffers.erase(buffer); + return true; } -static void rpc_buffer_clear(const std::vector & input) { +bool rpc_server::buffer_clear(const std::vector & input) { // input serialization format: | remote_ptr (8 bytes) | value (1 byte) | + if (input.size() != sizeof(uint64_t) + sizeof(uint8_t)) { + return false; + } uint64_t remote_ptr; memcpy(&remote_ptr, input.data(), sizeof(remote_ptr)); uint8_t value; memcpy(&value, input.data() + sizeof(uint64_t), sizeof(value)); GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 ", value: %u\n", __func__, remote_ptr, value); ggml_backend_buffer_t buffer = reinterpret_cast(remote_ptr); + if (buffers.find(buffer) == buffers.end()) { + GGML_PRINT_DEBUG("[%s] buffer not found\n", __func__); + return false; + } ggml_backend_buffer_clear(buffer, value); + return true; } -static void rpc_set_tensor(const std::vector & input) { +ggml_tensor * rpc_server::deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor) { + ggml_tensor * result = ggml_new_tensor_4d(ctx, (ggml_type) tensor->type, + tensor->ne[0], tensor->ne[1], tensor->ne[2], tensor->ne[3]); + for (uint32_t i = 0; i < GGML_MAX_DIMS; i++) { + result->nb[i] = tensor->nb[i]; + } + result->buffer = reinterpret_cast(tensor->buffer); + if (result->buffer && buffers.find(result->buffer) == buffers.end()) { + return nullptr; + } + result->op = (ggml_op) tensor->op; + for (uint32_t i = 0; i < GGML_MAX_OP_PARAMS / sizeof(int32_t); i++) { + result->op_params[i] = tensor->op_params[i]; + } + result->flags = tensor->flags; + result->data = reinterpret_cast(tensor->data); + ggml_set_name(result, tensor->name); + return result; +} + + +bool rpc_server::set_tensor(const std::vector & input) { // serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes) | + if (input.size() < sizeof(rpc_tensor) + sizeof(uint64_t)) { + return false; + } const rpc_tensor * in_tensor = (const rpc_tensor *)input.data(); uint64_t offset; memcpy(&offset, input.data() + sizeof(rpc_tensor), sizeof(offset)); @@ -811,14 +886,23 @@ static void rpc_set_tensor(const std::vector & input) { }; struct ggml_context * ctx = ggml_init(params); ggml_tensor * tensor = deserialize_tensor(ctx, in_tensor); + if (tensor == nullptr) { + GGML_PRINT_DEBUG("[%s] error deserializing tensor\n", __func__); + ggml_free(ctx); + return false; + } GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size); const void * data = input.data() + sizeof(rpc_tensor) + sizeof(offset); ggml_backend_tensor_set(tensor, data, offset, size); ggml_free(ctx); + return true; } -static void rpc_get_tensor(const std::vector & input, std::vector & output) { +bool rpc_server::get_tensor(const std::vector & input, std::vector & output) { // serialization format: | rpc_tensor | offset (8 bytes) | size (8 bytes) | + if (input.size() != sizeof(rpc_tensor) + 2*sizeof(uint64_t)) { + return false; + } const rpc_tensor * in_tensor = (const rpc_tensor *)input.data(); uint64_t offset; memcpy(&offset, input.data() + sizeof(rpc_tensor), sizeof(offset)); @@ -832,15 +916,24 @@ static void rpc_get_tensor(const std::vector & input, std::vectorbuffer, tensor->data, offset, size); // output serialization format: | data (size bytes) | output.resize(size, 0); ggml_backend_tensor_get(tensor, output.data(), offset, size); ggml_free(ctx); + return true; } -static void rpc_copy_tensor(const std::vector & input, std::vector & output) { +bool rpc_server::copy_tensor(const std::vector & input, std::vector & output) { // serialization format: | rpc_tensor src | rpc_tensor dst | + if (input.size() != 2*sizeof(rpc_tensor)) { + return false; + } const rpc_tensor * rpc_src = (const rpc_tensor *)input.data(); const rpc_tensor * rpc_dst = (const rpc_tensor *)(input.data() + sizeof(rpc_src)); @@ -852,18 +945,24 @@ static void rpc_copy_tensor(const std::vector & input, std::vectorbuffer: %p, dst->buffer: %p\n", __func__, (void*)src->buffer, (void*)dst->buffer); bool result = ggml_backend_buffer_copy_tensor(src, dst); // output serialization format: | result (1 byte) | output.resize(1, 0); output[0] = result; ggml_free(ctx); + return true; } -static struct ggml_tensor * create_node(uint64_t id, - struct ggml_context * ctx, - const std::unordered_map & tensor_ptrs, - std::unordered_map & tensor_map) { +ggml_tensor * rpc_server::create_node(uint64_t id, + struct ggml_context * ctx, + const std::unordered_map & tensor_ptrs, + std::unordered_map & tensor_map) { if (id == 0) { return nullptr; } @@ -872,6 +971,9 @@ static struct ggml_tensor * create_node(uint64_t id, } const rpc_tensor * tensor = tensor_ptrs.at(id); struct ggml_tensor * result = deserialize_tensor(ctx, tensor); + if (result == nullptr) { + return nullptr; + } tensor_map[id] = result; for (int i = 0; i < GGML_MAX_SRC; i++) { result->src[i] = create_node(tensor->src[i], ctx, tensor_ptrs, tensor_map); @@ -881,14 +983,23 @@ static struct ggml_tensor * create_node(uint64_t id, return result; } -static void rpc_graph_compute(ggml_backend_t backend, const std::vector & input, std::vector & output) { +bool rpc_server::graph_compute(const std::vector & input, std::vector & output) { // serialization format: // | n_nodes (4 bytes) | nodes (n_nodes * sizeof(uint64_t) | n_tensors (4 bytes) | tensors (n_tensors * sizeof(rpc_tensor)) | + if (input.size() < sizeof(uint32_t)) { + return false; + } uint32_t n_nodes; memcpy(&n_nodes, input.data(), sizeof(n_nodes)); + if (input.size() < sizeof(uint32_t) + n_nodes*sizeof(uint64_t) + sizeof(uint32_t)) { + return false; + } const uint64_t * nodes = (const uint64_t *)(input.data() + sizeof(n_nodes)); uint32_t n_tensors; memcpy(&n_tensors, input.data() + sizeof(n_nodes) + n_nodes*sizeof(uint64_t), sizeof(n_tensors)); + if (input.size() < sizeof(uint32_t) + n_nodes*sizeof(uint64_t) + sizeof(uint32_t) + n_tensors*sizeof(rpc_tensor)) { + return false; + } const rpc_tensor * tensors = (const rpc_tensor *)(input.data() + sizeof(n_nodes) + n_nodes*sizeof(uint64_t) + sizeof(n_tensors)); GGML_PRINT_DEBUG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors); @@ -914,9 +1025,17 @@ static void rpc_graph_compute(ggml_backend_t backend, const std::vector output.resize(1, 0); output[0] = status; ggml_free(ctx); + return true; +} + +rpc_server::~rpc_server() { + for (auto buffer : buffers) { + ggml_backend_buffer_free(buffer); + } } static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t free_mem, size_t total_mem) { + rpc_server server(backend); while (true) { uint8_t cmd; if (!recv_data(sockfd, &cmd, 1)) { @@ -932,45 +1051,46 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre if (!recv_data(sockfd, input.data(), input_size)) { break; } + bool ok = true; switch (cmd) { case ALLOC_BUFFER: { - rpc_alloc_buffer(backend, input, output); + ok = server.alloc_buffer(input, output); break; } case GET_ALIGNMENT: { - rpc_get_alignment(backend, output); + server.get_alignment(output); break; } case GET_MAX_SIZE: { - rpc_get_max_size(backend, output); + server.get_max_size(output); break; } case BUFFER_GET_BASE: { - rpc_buffer_get_base(input, output); + ok = server.buffer_get_base(input, output); break; } case FREE_BUFFER: { - rpc_free_buffer(input); + ok = server.free_buffer(input); break; } case BUFFER_CLEAR: { - rpc_buffer_clear(input); + ok = server.buffer_clear(input); break; } case SET_TENSOR: { - rpc_set_tensor(input); + ok = server.set_tensor(input); break; } case GET_TENSOR: { - rpc_get_tensor(input, output); + ok = server.get_tensor(input, output); break; } case COPY_TENSOR: { - rpc_copy_tensor(input, output); + ok = server.copy_tensor(input, output); break; } case GRAPH_COMPUTE: { - rpc_graph_compute(backend, input, output); + ok = server.graph_compute(input, output); break; } case GET_DEVICE_MEMORY: { @@ -982,9 +1102,12 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre } default: { fprintf(stderr, "Unknown command: %d\n", cmd); - return; + ok = false; } } + if (!ok) { + break; + } uint64_t output_size = output.size(); if (!send_data(sockfd, &output_size, sizeof(output_size))) { break;