diff --git a/ggml-mpi.c b/ggml-mpi.c index 16a088e57..8bf4468a1 100644 --- a/ggml-mpi.c +++ b/ggml-mpi.c @@ -6,84 +6,15 @@ #include #include +#include + +#define MIN(a, b) ((a) < (b) ? (a) : (b)) #define UNUSED GGML_UNUSED -struct ggml_mpi_tensor_info { - int rank; -}; - -// ggml_compute_forward_send - -static void ggml_mpi_compute_forward_send( - struct ggml_tensor * src, - const struct ggml_tensor * orig) { - UNUSED(orig); - GGML_ASSERT(src->type == GGML_TYPE_F32); - - int my_rank; - MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); - - int dst_rank = ((struct ggml_mpi_tensor_info *)src->extra)->rank; - // fprintf(stderr, "(%d) Sending to (%d)\n", my_rank, (int)dst->extra); - int retval = MPI_Send(src->data, ggml_nelements(src), MPI_FLOAT, dst_rank, 0, MPI_COMM_WORLD); - // fprintf(stderr, "(%d) Sent to (%d)\n", my_rank, (int)dst->extra); - GGML_ASSERT(retval == MPI_SUCCESS); -} - -// ggml_compute_forward_recv - -static void ggml_mpi_compute_forward_recv( - struct ggml_tensor * dst, - const struct ggml_tensor * orig, - const struct ggml_tensor * parent) { - UNUSED(parent); - UNUSED(orig); - GGML_ASSERT(dst->type == GGML_TYPE_F32); - MPI_Status status; - - int my_rank; - MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); - - int src_rank = ((struct ggml_mpi_tensor_info *)dst->extra)->rank; - // fprintf(stderr, "(%d) Receiving from (%d)\n", my_rank, src_extra); - int retval = MPI_Recv(dst->data, ggml_nelements(dst), MPI_FLOAT, src_rank, MPI_ANY_TAG, MPI_COMM_WORLD, &status); - // fprintf(stderr, "(%d) Received from (%d)\n", my_rank, src_extra); - GGML_ASSERT(retval == MPI_SUCCESS); -} - -struct ggml_tensor * ggml_mpi_send_tensor( - struct ggml_context * ctx, - struct ggml_tensor * src, - int dst_rank) { - struct ggml_tensor * result = ggml_map_custom1_inplace_f32(ctx, src, ggml_mpi_compute_forward_send); - - // TODO how/when to free this struct? - struct ggml_mpi_tensor_info *info = calloc(1, sizeof(struct ggml_mpi_tensor_info)); - info->rank = dst_rank; - result->extra = info; - - return result; -} - -struct ggml_tensor * ggml_mpi_recv_tensor( - struct ggml_context * ctx, - struct ggml_tensor * parent, - struct ggml_tensor * dst, - int src_rank) { - struct ggml_tensor * result = ggml_map_custom2_inplace_f32(ctx, dst, parent, ggml_mpi_compute_forward_recv); - - // TODO how/when to free this struct? - struct ggml_mpi_tensor_info *info = calloc(1, sizeof(struct ggml_mpi_tensor_info)); - info->rank = src_rank; - result->extra = info; - - return result; -} - struct ggml_mpi_context { - int mpi_rank; - int mpi_size; + int rank; + int size; }; void ggml_mpi_backend_init(void) { @@ -97,8 +28,8 @@ void ggml_mpi_backend_free(void) { struct ggml_mpi_context * ggml_mpi_init(void) { struct ggml_mpi_context * ctx = calloc(1, sizeof(struct ggml_mpi_context)); - MPI_Comm_rank(MPI_COMM_WORLD, &ctx->mpi_rank); - MPI_Comm_size(MPI_COMM_WORLD, &ctx->mpi_size); + MPI_Comm_rank(MPI_COMM_WORLD, &ctx->rank); + MPI_Comm_size(MPI_COMM_WORLD, &ctx->size); return ctx; } @@ -108,17 +39,15 @@ void ggml_mpi_free(struct ggml_mpi_context * ctx) { } int ggml_mpi_rank(struct ggml_mpi_context * ctx) { - return ctx->mpi_rank; + return ctx->rank; } -struct ggml_tensor * ggml_mpi_eval_init( +void ggml_mpi_eval_init( struct ggml_mpi_context * ctx_mpi, - struct ggml_context * ctx, - int n_embd, int * n_tokens, int * n_past, int * n_threads) { - struct ggml_tensor * res = NULL; + UNUSED(ctx_mpi); // synchronize the worker node parameters with the root node MPI_Barrier(MPI_COMM_WORLD); @@ -126,21 +55,130 @@ struct ggml_tensor * ggml_mpi_eval_init( MPI_Bcast(n_tokens, 1, MPI_INT, 0, MPI_COMM_WORLD); MPI_Bcast(n_past, 1, MPI_INT, 0, MPI_COMM_WORLD); MPI_Bcast(n_threads, 1, MPI_INT, 0, MPI_COMM_WORLD); +} - if (ctx_mpi->mpi_rank > 0) { - res = ggml_mpi_recv_tensor(ctx, NULL, - ggml_new_tensor_2d(ctx, GGML_TYPE_F32, n_embd, *n_tokens), ctx_mpi->mpi_rank - 1); - ggml_set_name(res, "mpi_recv"); +int ggml_graph_get_node_idx( struct ggml_cgraph * gf, const char * name) { + struct ggml_tensor * t = ggml_graph_get_tensor(gf, name); + if (t == NULL) { + fprintf(stderr, "%s: tensor %s not found\n", __func__, name); + return -1; } - return res; + for (int i = 0; i < gf->n_nodes; i++) { + if (gf->nodes[i] == t) { + return i; + } + } + + fprintf(stderr, "%s: tensor %s not found in graph (should not happen)\n", __func__, name); + return -1; } void ggml_mpi_graph_compute( struct ggml_mpi_context * ctx_mpi, + struct ggml_context * ctx, struct ggml_cgraph * gf, - int n_layers, - int n_embd, - int n_tokens) { + int n_layers) { + const int mpi_rank = ctx_mpi->rank; + const int mpi_size = ctx_mpi->size; + struct ggml_tensor * embd = ggml_graph_get_tensor(gf, "layer_inp_0"); + if (embd == NULL) { + fprintf(stderr, "%s: tensor 'embd' not found\n", __func__); + return; + } + + GGML_ASSERT(embd == gf->nodes[0]); + + // distribute the compute graph into slices across the MPI nodes + // + // the main node (0) processes the last layers + the remainder of the compute graph + // and is responsible to pass the input embeddings to the first node (1) + // + // node 1: [( 0) * n_per_node, ( 1) * n_per_node) + // node 2: [( 1) * n_per_node, ( 2) * n_per_node) + // ... + // node n-1: [(n-2) * n_per_node, (n-1) * n_per_node) + // node 0: [(n-1) * n_per_node, n_nodes) + // + if (mpi_rank > 0) { + // recv input data for each node into the "embd" tensor (i.e. the first node in the compute graph) + { + MPI_Status status; UNUSED(status); + + const int mpi_rank_src = mpi_rank - 1; + + // fprintf(stderr, "(%d) Receiving from (%d)\n", mpi_rank, mpi_rank_src); + const int retval = MPI_Recv(embd, ggml_nelements(embd), MPI_FLOAT, mpi_rank_src, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + GGML_ASSERT(retval == MPI_SUCCESS); + // fprintf(stderr, "(%d) Received from (%d)\n", mpi_rank, mpi_rank_src); + } + } else { + // node 0 sends the input data to node 1 + { + const int mpi_rank_dst = mpi_rank + 1; + + const int retval = MPI_Send(embd, ggml_nelements(embd), MPI_FLOAT, mpi_rank_dst, 0, MPI_COMM_WORLD); + GGML_ASSERT(retval == MPI_SUCCESS); + // fprintf(stderr, "(%d) Sent to (%d)\n", mpi_rank, mpi_rank_dst); + } + + // recv the output data from the last node + { + MPI_Status status; UNUSED(status); + + const int mpi_rank_src = mpi_size - 1; + + const int retval = MPI_Recv(embd, ggml_nelements(embd), MPI_FLOAT, mpi_rank_src, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + GGML_ASSERT(retval == MPI_SUCCESS); + } + } + + { + const int n_per_node = (n_layers + (mpi_size - 1)) / mpi_size; + + const int mpi_idx = mpi_rank > 0 ? mpi_rank - 1 : mpi_size - 1; + + const int il0 = (mpi_idx + 0) * n_per_node; + const int il1 = MIN(n_layers, (mpi_idx + 1) * n_per_node); + + char name_l0[64]; + char name_l1[64]; + + snprintf(name_l0, sizeof(name_l0), "layer_inp_%d", il0); + snprintf(name_l1, sizeof(name_l1), "layer_inp_%d", il1); + + const int idx_l0 = ggml_graph_get_node_idx(gf, name_l0); + const int idx_l1 = mpi_rank > 0 ? ggml_graph_get_node_idx(gf, name_l1) : gf->n_nodes; + + if (idx_l0 < 0 || idx_l1 < 0) { + fprintf(stderr, "%s: layer input nodes not found\n", __func__); + return; + } + + // attach the input data to the first layer for this node + gf->nodes[idx_l0 + 1]->src0 = gf->nodes[1]->src0; + gf->nodes[idx_l0 + 1]->src1 = gf->nodes[1]->src1; + + memcpy(gf->nodes[idx_l0 + 1]->opt, gf->nodes[1]->opt, sizeof(gf->nodes[idx_l0 + 1]->opt)); + + for (int i = 1; i < idx_l1 - idx_l0; i++) { + gf->nodes[i] = gf->nodes[idx_l0 + i]; + gf->grads[i] = gf->grads[idx_l0 + i]; + } + + gf->n_nodes = idx_l1 - idx_l0; + } + + ggml_graph_compute(ctx, gf); + + // send the output data to the next node + if (mpi_rank > 0) { + struct ggml_tensor * output = gf->nodes[gf->n_nodes - 1]; + + const int mpi_rank_dst = (mpi_rank + 1) % mpi_size; + + const int retval = MPI_Send(output, ggml_nelements(output), MPI_FLOAT, mpi_rank_dst, 0, MPI_COMM_WORLD); + GGML_ASSERT(retval == MPI_SUCCESS); + } } diff --git a/ggml-mpi.h b/ggml-mpi.h index fc3d0ce51..02e125cfb 100644 --- a/ggml-mpi.h +++ b/ggml-mpi.h @@ -8,16 +8,6 @@ struct ggml_cgraph; extern "C" { #endif -struct ggml_tensor * ggml_mpi_send_tensor( - struct ggml_context * ctx, - struct ggml_tensor * src, - int dst_rank); -struct ggml_tensor * ggml_mpi_recv_tensor( - struct ggml_context * ctx, - struct ggml_tensor * parent, - struct ggml_tensor * dst, - int src_rank); - struct ggml_mpi_context; void ggml_mpi_backend_init(void); @@ -28,20 +18,17 @@ void ggml_mpi_free(struct ggml_mpi_context * ctx); int ggml_mpi_rank(struct ggml_mpi_context * ctx); -struct ggml_tensor * ggml_mpi_eval_init( +void ggml_mpi_eval_init( struct ggml_mpi_context * ctx_mpi, - struct ggml_context * ctx, - int n_embd, int * n_tokens, int * n_past, int * n_threads); void ggml_mpi_graph_compute( struct ggml_mpi_context * ctx_mpi, + struct ggml_context * ctx, struct ggml_cgraph * gf, - int n_layers, - int n_embd, - int n_tokens); + int n_layers); #ifdef __cplusplus } diff --git a/llama.cpp b/llama.cpp index 88ccd4999..fa8030c36 100644 --- a/llama.cpp +++ b/llama.cpp @@ -1332,15 +1332,11 @@ static bool llama_eval_internal( struct ggml_tensor * inpL; #ifdef GGML_USE_MPI - inpL = ggml_mpi_eval_init(lctx.ctx_mpi, ctx0, n_embd, &n_tokens, &n_past, &n_threads); - - if (inpL) { - // only rank 0 loads uses the input - } else + ggml_mpi_eval_init(lctx.ctx_mpi, &n_tokens, &n_past, &n_threads); #endif + if (tokens) { struct ggml_tensor * embd = ggml_new_tensor_1d(ctx0, GGML_TYPE_I32, N); - ggml_set_name(embd, "embd"); memcpy(embd->data, tokens, N*ggml_element_size(embd)); inpL = ggml_get_rows(ctx0, model.tok_embeddings, embd); } else { @@ -1348,6 +1344,8 @@ static bool llama_eval_internal( memcpy(inpL->data, embd, N * n_embd * ggml_element_size(inpL)); } + ggml_set_name(inpL, "embd"); + const int i_gpu_start = n_layer - n_gpu_layers; (void) i_gpu_start; @@ -1638,7 +1636,7 @@ static bool llama_eval_internal( ggml_graph_compute(ctx0, &gf); } #elif GGML_USE_MPI - ggml_mpi_graph_compute(lctx.ctx_mpi, &gf, n_layer, n_embd, n_tokens); + ggml_mpi_graph_compute(lctx.ctx_mpi, ctx0, &gf, n_layer, n_embd, n_tokens); #else ggml_graph_compute(ctx0, &gf); #endif @@ -2716,7 +2714,7 @@ struct llama_context * llama_new_context_with_model( if (ggml_mpi_rank(ctx->ctx_mpi) > 0) { // Enter a blocking eval loop with dummy input, letting rank=0 drive the process - const std::vector tmp = { llama_token_bos(), }; + const std::vector tmp(ctx->model.hparams.n_ctx, llama_token_bos()); while (!llama_eval(ctx, tmp.data(), tmp.size(), 0, 0)) {}; llama_backend_free(); exit(1);