# # GGUF file reading/modification support. For API usage information, # please see the files scripts/ for some fairly simple examples. # from __future__ import annotations import os from collections import OrderedDict from typing import Any, Literal, NamedTuple, TypeVar, Union import numpy as np import numpy.typing as npt if __name__ == "__main__": import sys from pathlib import Path # Allow running file in package as a script. sys.path.insert(0, str(Path(__file__).parent.parent)) from gguf.constants import ( GGML_QUANT_SIZES, GGUF_DEFAULT_ALIGNMENT, GGUF_MAGIC, GGUF_VERSION, GGMLQuantizationType, GGUFValueType, ) READER_SUPPORTED_VERSIONS = [2, GGUF_VERSION] class ReaderField(NamedTuple): # Offset to start of this field. offset: int # Name of the field (not necessarily from file data). name: str # Data parts. Some types have multiple components, such as strings # that consist of a length followed by the string data. parts: list[npt.NDArray[Any]] = [] # Indexes into parts that we can call the actual data. For example # an array of strings will be populated with indexes to the actual # string data. data: list[int] = [-1] types: list[GGUFValueType] = [] class ReaderTensor(NamedTuple): name: str tensor_type: GGMLQuantizationType shape: npt.NDArray[np.uint32] n_elements: int n_bytes: int data_offset: int data: npt.NDArray[Any] field: ReaderField class GGUFReader: # I - same as host, S - swapped byte_order: Literal['I' | 'S'] = 'I' alignment: int = GGUF_DEFAULT_ALIGNMENT # Note: Internal helper, API may change. gguf_scalar_to_np: dict[GGUFValueType, type[np.generic]] = { GGUFValueType.UINT8: np.uint8, GGUFValueType.INT8: np.int8, GGUFValueType.UINT16: np.uint16, GGUFValueType.INT16: np.int16, GGUFValueType.UINT32: np.uint32, GGUFValueType.INT32: np.int32, GGUFValueType.FLOAT32: np.float32, GGUFValueType.UINT64: np.uint64, GGUFValueType.INT64: np.int64, GGUFValueType.FLOAT64: np.float64, GGUFValueType.BOOL: np.bool_, } def __init__(self, path: os.PathLike[str] | str, mode: Literal['r' | 'r+' | 'c'] = 'r'): self.data = np.memmap(path, mode = mode) offs = 0 if self._get(offs, np.uint32, override_order = '<')[0] != GGUF_MAGIC: raise ValueError('GGUF magic invalid') offs += 4 temp_version = self._get(offs, np.uint32) if temp_version[0] & 65535 == 0: # If we get 0 here that means it's (probably) a GGUF file created for # the opposite byte order of the machine this script is running on. self.byte_order = 'S' temp_version = temp_version.newbyteorder(self.byte_order) version = temp_version[0] if version not in READER_SUPPORTED_VERSIONS: raise ValueError(f'Sorry, file appears to be version {version} which we cannot handle') self.fields: OrderedDict[str, ReaderField] = OrderedDict() self.tensors: list[ReaderTensor] = [] offs += self._push_field(ReaderField(offs, 'GGUF.version', [temp_version], [0], [GGUFValueType.UINT32])) temp_counts = self._get(offs, np.uint64, 2) offs += self._push_field(ReaderField(offs, 'GGUF.tensor_count', [temp_counts[:1]], [0], [GGUFValueType.UINT64])) offs += self._push_field(ReaderField(offs, 'GGUF.kv_count', [temp_counts[1:]], [0], [GGUFValueType.UINT64])) tensor_count, kv_count = temp_counts offs = self._build_fields(offs, kv_count) offs, tensors_fields = self._build_tensors_fields(offs, tensor_count) new_align = self.fields.get('general.alignment') if new_align is not None: if new_align.types != [GGUFValueType.UINT32]: raise ValueError('Bad type for general.alignment field') self.alignment = new_align.parts[-1][0] padding = offs % self.alignment if padding != 0: offs += self.alignment - padding self._build_tensors(offs, tensors_fields) _DT = TypeVar('_DT', bound = npt.DTypeLike) # Fetch a key/value metadata field by key. def get_field(self, key: str) -> Union[ReaderField, None]: return self.fields.get(key, None) # Fetch a tensor from the list by index. def get_tensor(self, idx: int) -> ReaderTensor: return self.tensors[idx] def _get( self, offset: int, dtype: npt.DTypeLike, count: int = 1, override_order: None | Literal['I' | 'S' | '<'] = None, ) -> npt.NDArray[Any]: count = int(count) itemsize = int(np.empty([], dtype = dtype).itemsize) end_offs = offset + itemsize * count return ( self.data[offset:end_offs] .view(dtype = dtype)[:count] .newbyteorder(override_order or self.byte_order) ) def _push_field(self, field: ReaderField, skip_sum: bool = False) -> int: if field.name in self.fields: # TODO: add option to generate error on duplicate keys # raise KeyError(f'Duplicate {field.name} already in list at offset {field.offset}') print(f'Warning: Duplicate key {field.name} at offset {field.offset}') self.fields[field.name + '_{}'.format(field.offset)] = field else: self.fields[field.name] = field return 0 if skip_sum else sum(int(part.nbytes) for part in field.parts) def _get_str(self, offset: int) -> tuple[npt.NDArray[np.uint64], npt.NDArray[np.uint8]]: slen = self._get(offset, np.uint64) return slen, self._get(offset + 8, np.uint8, slen[0]) def _get_field_parts( self, orig_offs: int, raw_type: int, ) -> tuple[int, list[npt.NDArray[Any]], list[int], list[GGUFValueType]]: offs = orig_offs types: list[GGUFValueType] = [] gtype = GGUFValueType(raw_type) types.append(gtype) # Handle strings. if gtype == GGUFValueType.STRING: sparts: list[npt.NDArray[Any]] = list(self._get_str(offs)) size = sum(int(part.nbytes) for part in sparts) return size, sparts, [1], types # Check if it's a simple scalar type. nptype = self.gguf_scalar_to_np.get(gtype) if nptype is not None: val = self._get(offs, nptype) return int(val.nbytes), [val], [0], types # Handle arrays. if gtype == GGUFValueType.ARRAY: raw_itype = self._get(offs, np.uint32) offs += int(raw_itype.nbytes) alen = self._get(offs, np.uint64) offs += int(alen.nbytes) aparts: list[npt.NDArray[Any]] = [raw_itype, alen] data_idxs: list[int] = [] for idx in range(alen[0]): curr_size, curr_parts, curr_idxs, curr_types = self._get_field_parts(offs, raw_itype[0]) if idx == 0: types += curr_types idxs_offs = len(aparts) aparts += curr_parts data_idxs += (idx + idxs_offs for idx in curr_idxs) offs += curr_size return offs - orig_offs, aparts, data_idxs, types # We can't deal with this one. raise ValueError('Unknown/unhandled field type {gtype}') def _get_tensor(self, orig_offs: int) -> ReaderField: offs = orig_offs name_len, name_data = self._get_str(offs) offs += int(name_len.nbytes + name_data.nbytes) n_dims = self._get(offs, np.uint32) offs += int(n_dims.nbytes) dims = self._get(offs, np.uint64, n_dims[0]) offs += int(dims.nbytes) raw_dtype = self._get(offs, np.uint32) offs += int(raw_dtype.nbytes) offset_tensor = self._get(offs, np.uint64) offs += int(offset_tensor.nbytes) return ReaderField( orig_offs, str(bytes(name_data), encoding = 'utf-8'), [name_len, name_data, n_dims, dims, raw_dtype, offset_tensor], [1, 3, 4, 5], ) def _build_fields(self, offs: int, count: int) -> int: for _ in range(count): orig_offs = offs kv_klen, kv_kdata = self._get_str(offs) offs += int(kv_klen.nbytes + kv_kdata.nbytes) raw_kv_type = self._get(offs, np.uint32) offs += int(raw_kv_type.nbytes) parts: list[npt.NDArray[Any]] = [kv_klen, kv_kdata, raw_kv_type] idxs_offs = len(parts) field_size, field_parts, field_idxs, field_types = self._get_field_parts(offs, raw_kv_type[0]) parts += field_parts self._push_field(ReaderField( orig_offs, str(bytes(kv_kdata), encoding = 'utf-8'), parts, [idx + idxs_offs for idx in field_idxs], field_types, ), skip_sum = True) offs += field_size return offs def _build_tensors_fields(self, offs: int, count: int) -> tuple[int, list[ReaderField]]: tensor_fields = [] for _ in range(count): field = self._get_tensor(offs) offs += sum(int(part.nbytes) for part in field.parts) tensor_fields.append(field) return offs, tensor_fields def _build_tensors(self, start_offs: int, fields: list[ReaderField]) -> None: tensors = [] tensor_names = set() # keep track of name to prevent duplicated tensors for field in fields: _name_len, name_data, _n_dims, dims, raw_dtype, offset_tensor = field.parts # check if there's any tensor having same name already in the list tensor_name = str(bytes(name_data), encoding = 'utf-8') if tensor_name in tensor_names: raise ValueError(f'Found duplicated tensor with name {tensor_name}') tensor_names.add(tensor_name) ggml_type = GGMLQuantizationType(raw_dtype[0]) n_elems = np.prod(dims) block_size, type_size = GGML_QUANT_SIZES[ggml_type] n_bytes = n_elems * type_size // block_size data_offs = int(start_offs + offset_tensor[0]) item_type: npt.DTypeLike if ggml_type == GGMLQuantizationType.F16: item_count = n_elems item_type = np.float16 elif ggml_type == GGMLQuantizationType.F32: item_count = n_elems item_type = np.float32 elif ggml_type == GGMLQuantizationType.F64: item_count = n_elems item_type = np.float64 elif ggml_type == GGMLQuantizationType.I8: item_count = n_elems item_type = np.int8 elif ggml_type == GGMLQuantizationType.I16: item_count = n_elems item_type = np.int16 elif ggml_type == GGMLQuantizationType.I32: item_count = n_elems item_type = np.int32 elif ggml_type == GGMLQuantizationType.I64: item_count = n_elems item_type = np.int64 else: item_count = n_bytes item_type = np.uint8 tensors.append(ReaderTensor( name = tensor_name, tensor_type = ggml_type, shape = dims, n_elements = n_elems, n_bytes = n_bytes, data_offset = data_offs, data = self._get(data_offs, item_type, item_count), field = field, )) self.tensors = tensors