Source code for pyfive.high_level

"""High-level classes for reading HDF5 files."""

from __future__ import annotations
from collections import deque
from collections.abc import Callable
from collections.abc import Mapping, Sequence
from abc import ABC
import os
import posixpath
import warnings
import logging
import numpy as np
from typing import Any, BinaryIO, cast
from typing_extensions import Self  # Python 3.10-compat
from pyfive.core import Reference
from pyfive.dataobjects import DataObjects, DatasetID
from pyfive.misc_low_level import SuperBlock
from pyfive.h5py import Datatype
from pyfive.p5t import P5VlenStringType, P5ReferenceType, P5SequenceType
from pyfive.utilities import MetadataBufferingWrapper

logger = logging.getLogger(__name__)


[docs] class Group(Mapping): """ An HDF5 Group which may hold attributes, datasets, or other groups. Attributes ---------- attrs : dict Attributes for this group. name : str Full path to this group. file : File File instance where this group resides. parent : Group Group instance containing this group. """ def __init__(self, name: str, dataobjects: DataObjects, parent: "Group") -> None: """initalize.""" self.parent = parent self.file = parent.file # type: ignore[has-type] self.name = name self._links = dataobjects.get_links() self._dataobjects = dataobjects self._attrs = None # cached property def __repr__(self): return '<HDF5 group "%s" (%d members)>' % (self.name, len(self)) def __len__(self): """Number of links in the group.""" return len(self._links) def _dereference(self, ref): """Dereference a Reference object.""" if not ref: raise ValueError("cannot deference null reference") obj = self.file._get_object_by_address(ref.address_of_reference) if obj is None: raise ValueError("reference not found in file") return obj def __getitem__(self, y): """x.__getitem__(y) <==> x[y].""" return self.__getitem_lazy_control(y, noindex=False)
[docs] def get_lazy_view(self, y: object) -> object: """ This instantiates the object y, and if it is a chunked dataset, does so without reading the b-tree index. This is useful for inspecting a variable that you are not expecting to access. If you know you want to access the data, and in particular, if you are going to hand the data to Dask or something else, you almost certainly want to read the index now, so just do x[y] rather than x.get_lazy_view(y). This is a ``pyfive`` extension to the standard h5py API. """ return self.__getitem_lazy_control(y, noindex=True)
def __getitem_lazy_control(self, y, noindex): """ This is the routine which actually does the get item but does it in such a way that we control how much laziness is possible where we have chunked variables with b-trees. We want to return y, but if y is a chunked dataset we normally return it with a cached b-tree (noindex=false). If noindex is True, we do not read the b-tree, and that will be done when data is first read - which is fine in a single-threaded environment, but in a parallel environment you only want to read the index once (so use noindex=False, which you get via the normal getitem interface - x[y]). """ if isinstance(y, Reference): return self._dereference(y) path = posixpath.normpath(y) if path == ".": return self if path.startswith("/"): return self.file[path[1:]] if posixpath.dirname(path) != "": next_obj, additional_obj = path.split("/", 1) else: next_obj = path additional_obj = "." if next_obj not in self._links: raise KeyError("%s not found in group" % (next_obj)) obj_name = posixpath.join(self.name, next_obj) link_target = self._links[next_obj] if isinstance(link_target, str): try: return self.__getitem__(link_target) except KeyError: return None logger.info( f"[pyfive] Accessing object '{obj_name}' with link target {link_target} (lazy access: {noindex})" ) dataobjs = self.file._get_dataobjects(link_target) if dataobjs.is_dataset: if additional_obj != ".": raise KeyError("%s is a dataset, not a group" % (obj_name)) return Dataset(obj_name, DatasetID(dataobjs, noindex=noindex), self) try: # if true, this may well raise a NotImplementedError, if so, we need # to warn the user, who may be able to use other parts of the data. is_datatype = dataobjs.is_datatype except NotImplementedError as e: warnings.warn( f"Found datatype {obj_name} but pyfive cannot read this data: {e}" ) is_datatype = True if is_datatype: return Datatype(obj_name, self.file, dataobjs.ptype) else: return Group(obj_name, dataobjs, self)[additional_obj] def __iter__(self): for k in self._links.keys(): yield k
[docs] def visit(self, func: Callable) -> object: """ Recursively visit all names in the group and subgroups. func should be a callable with the signature: func(name) -> None or return value Returning None continues iteration, return anything else stops and return that value from the visit method. """ return self.visititems(lambda name, obj: func(name))
[docs] def visititems(self, func: Callable, noindex: bool = False) -> object: """ Recursively visit all objects in this group and subgroups. func should be a callable with the signature: func(name, object) -> None or return value Returning None continues iteration, return anything else stops and return that value from the visit method. Use of the optional noindex=True will ensure that all operations are not only lazy wrt data, but lazy wrt to any chunked data indices. This keyword argument is a ``pyfive`` extension to the standard h5py API. """ root_name_length = len(self.name) if not self.name.endswith("/"): root_name_length += 1 # Use either normal access or lazy access: if noindex: # Avoid loading dataset indices get_obj = self.get_lazy_view else: get_obj = self.__getitem__ # Initialize queue using the correct getter queue = deque(get_obj(k) for k in self._links.keys()) while queue: obj = queue.popleft() name = obj.name[root_name_length:] # type: ignore[attr-defined] ret = func(name, obj) if ret is not None: return ret if isinstance(obj, Group): queue.extend(obj.values()) return None
@property def attrs(self): """attrs attribute.""" if self._attrs is None: self._attrs = self._dataobjects.get_attributes() return self._attrs
[docs] class File(Group): """ Open a HDF5 file. Note in addition to having file specific methods the File object also inherit the full interface of **Group**. File is also a context manager and therefore supports the with statement. Files opened by the class will be closed after the with block, file-like object are not closed. Parameters ---------- filename : str or file-like Name of file (string or unicode) or file like object which has read and seek methods which behaved like a Python file object. mode : str File open mode (default: "r", read-only). metadata_buffer_size : int Size of metadata buffer for S3/remote files in MiB (default: 1MiB). Larger values reduce network calls but use more memory. (This is a pyfive extension for optimizing remote file access, ignored for local files.) Attributes ---------- filename : str Name of the file on disk, None if not available. mode : str String indicating that the file is open readonly ("r"). userblock_size : int Size of the user block in bytes (currently always 0). """ def __init__( self, filename: str | BinaryIO | MetadataBufferingWrapper, mode: str = "r", metadata_buffer_size: int = 1, ) -> None: """initalize.""" if mode != "r": raise NotImplementedError( "pyfive only provides support for reading and treats all reads as binary" ) self._close = False if hasattr(filename, "read"): if not hasattr(filename, "seek"): raise ValueError("File like object must have a seek method") fh = cast(BinaryIO, filename) self.filename = getattr(filename, "name", "None") else: fh = open(filename, "rb") self._close = True self.filename = filename # Wrap S3 file handles with metadata buffering to reduce network calls self._fh: BinaryIO | MetadataBufferingWrapper if isinstance(fh, MetadataBufferingWrapper): # Already wrapped self._fh = fh elif type(fh).__name__ == "S3File" or hasattr(fh, "fs"): # fsspec file handle - wrap with buffering # We check for the S3File type by name to avoid a hard dependency on s3fs, # but also check for an 'fs' attribute which is common in s3fs file-like objects. # This may yet be too broad, but it is unlikely to cause issues for non-S3 files. logger.info( "[pyfive] Detected remote file, enabling metadata buffering (%d MB)", metadata_buffer_size, ) self._fh = MetadataBufferingWrapper(fh, buffer_size=metadata_buffer_size) else: # Local file or other # NOTE mypy detects incompatible types: # str | BytesIO = MetadataBufferingWrapper self._fh = fh self._superblock = SuperBlock(self._fh, 0) self._dataobjects_cache: dict = {} offset = self._superblock.offset_to_dataobjects dataobjects = self._get_dataobjects(offset) self.file = self self.mode = "r" self.userblock_size = 0 super(File, self).__init__("/", dataobjects, self) @property def consolidated_metadata(self) -> bool: """Returns True if all B-tree nodes for chunked datasets are located before the first chunk in the file.""" is_consolidated = True f = self # for all chunked datasets, check if all btree nodes are located before any dataset chunk max_btree, min_chunk = None, None for ds in f: if isinstance(f[ds], Dataset): if f[ds].id.layout_class == 2: if max_btree is None or f[ds].id.btree_range[1] > max_btree: max_btree = f[ds].id.btree_range[1] if min_chunk is None or f[ds].id.first_chunk < min_chunk: min_chunk = f[ds].id.first_chunk if max_btree is not None and min_chunk is not None: is_consolidated = max_btree < min_chunk return is_consolidated def __repr__(self) -> str: return '<HDF5 file "%s" (mode r)>' % (os.path.basename(self.filename)) def _get_dataobjects(self, obj_addr): """Return cached DataObjects for an object header address.""" cached = self._dataobjects_cache.get(obj_addr) if cached is not None: return cached dataobjects = DataObjects(self._fh, obj_addr) self._dataobjects_cache[obj_addr] = dataobjects return dataobjects def _get_object_by_address(self, obj_addr: BinaryIO) -> Self | Any | None: # type: ignore[return] """Return the object pointed to by a given address.""" if self._dataobjects.offset == obj_addr: return self queue = deque([(self.name.rstrip("/"), self)]) while queue: base, grp = queue.popleft() for name, link_addr in grp._links.items(): full_path = f"{base}/{name}" if base else f"/{name}" # check address without instantiating if link_addr == obj_addr: # return instantiated object return grp[name] # descend only if it's a subgroup (need to instantiate minimally) if self._get_dataobjects(link_addr).is_group: queue.append((full_path, grp[name]))
[docs] def close(self): """Close the file.""" if self._close: self._fh.close()
__del__ = close def __enter__(self): return self def __exit__(self, exc_type, value, traceback): self.close()
[docs] class Dataset(ABC): """ A HDF5 Dataset containing an n-dimensional array and meta-data attributes. Attributes ---------- shape : tuple Dataset dimensions. dtype : dtype Dataset's type. size : int Total number of elements in the dataset. chunks : tuple or None Chunk shape, or NOne is chunked storage not used. compression : str or None Compression filter used on dataset. None if compression is not enabled for this dataset. compression_opts : dict or None Options for the compression filter. scaleoffset : dict or None Setting for the HDF5 scale-offset filter, or None if scale-offset compression is not used for this dataset. shuffle : bool Whether the shuffle filter is applied for this dataset. fletcher32 : bool Whether the Fletcher32 checksumming is enabled for this dataset. fillvalue : float or None Value indicating uninitialized portions of the dataset. None is no fill values has been defined. dim : int Number of dimensions. dims : None Dimension scales. attrs : dict Attributes for this dataset. name : str Full path to this dataset. file : File File instance where this dataset resides. parent : Group Group instance containing this dataset. """ # Dataset is now an ABC to allow for the possibility of other dataset types, which # may not support all of the same attributes or methods. Examples already include # the ppfive Datastet class, which does not support accessing HDF5, but does # support the same interface for accessing data and attributes in pp/fields files. def __init__(self, name: str, datasetid: DatasetID, parent: Group) -> None: """initalize.""" self.parent = parent self.file = parent.file self.name = name self._attrs = None self._astype = None self.id = datasetid """ This is the DatasetID instance which provides the actual data access methods. """ # horrible kludge for now, # https://github.com/NCAS-CMS/pyfive/issues/13#issuecomment-2557121461 # we hide stuff we need here self._dataobjects = self.id._meta def __repr__(self): info = (os.path.basename(self.name), self.shape, self.dtype) return '<HDF5 dataset "%s": shape %s, type "%s">' % info def __getitem__(self, args): data = self.id.get_data(args, self.fillvalue) if self._astype is None: return data return data.astype(self._astype)
[docs] def read_direct( self, array: np.ndarray, source_sel: None | tuple = None, dest_sel: None | tuple = None, ) -> None: """ Read from a HDF5 dataset directly into a NumPy array. This is equivalent to dset[source_sel] = arr[dset_sel]. Creation of intermediates is not avoided. This method if provided from compatibility with h5py, it is not efficient. """ array[dest_sel] = self[source_sel]
[docs] def astype(self, dtype: str) -> AstypeContext: """ Return a context manager which returns data as a particular type. Conversion is handled by NumPy after reading extracting the data. """ return AstypeContext(self, dtype)
[docs] def len(self): """Return the size of the first axis.""" return self.shape[0]
[docs] def iter_chunks(self, sel=()): return self.id.iter_chunks(sel)
@property def shape(self): """shape attribute.""" return self.id.shape @property def maxshape(self): """maxshape attribute. (None for unlimited dimensions)""" return self.id._meta.maxshape @property def ndim(self): """number of dimensions.""" return len(self.shape) @property def dtype(self): """dtype attribute.""" return self.id.dtype @property def value(self): """alias for dataset[()].""" DeprecationWarning( "dataset.value has been deprecated. Use dataset[()] instead." ) return self[()] @property def size(self): """size attribute.""" return np.prod(self.shape) @property def chunks(self): """chunks attribute.""" return self.id.chunks @property def compression(self): """compression attribute.""" return self.id._meta.compression @property def compression_opts(self): """compression_opts attribute.""" return self.id._meta.compression_opts @property def scaleoffset(self): """scaleoffset attribute.""" return None # TODO support scale-offset filter @property def shuffle(self): """shuffle attribute.""" return self.id._meta.shuffle @property def fletcher32(self): """fletcher32 attribute.""" return self.id._meta.fletcher32 @property def fillvalue(self): """fillvalue attribute.""" return self.id._meta.fillvalue @property def dims(self): """dims attribute.""" return DimensionManager(self) @property def attrs(self): """attrs attribute.""" return self.id._meta.attributes @property def __orthogonal_indexing__(self): """Flag to indicate whether indexing is orthogonal. In general, the flag will be `True` if: * The data is chunked. * The data is contiguous and memory mapped access is not available. """ if self.id.chunks is not None: # Chunked data indexed with # `DatasetID._get_selection_via_chunks` return True if ( not ( isinstance( self.id._ptype, (P5ReferenceType, P5VlenStringType, P5SequenceType) ) ) and not self.id.posix ): # Contiguous data indexed with # `DatasetID._get_direct_from_contiguous` return True # All other cases return False
class DimensionManager(Sequence): """Represents a collection of dimensions associated with a dataset.""" def __init__(self, dset): ndim = len(dset.shape) dim_list = [[]] * ndim if "DIMENSION_LIST" in dset.attrs: dim_list = dset.attrs["DIMENSION_LIST"] dim_labels = [b""] * ndim if "DIMENSION_LABELS" in dset.attrs: dim_labels = dset.attrs["DIMENSION_LABELS"] self._dims = [ DimensionProxy(dset.file, label, refs) for label, refs in zip(dim_labels, dim_list) ] def __len__(self): return len(self._dims) def __getitem__(self, x): return self._dims[x] class DimensionProxy(Sequence): """Represents a HDF5 "dimension".""" def __init__(self, dset_file, label, refs): try: # decode a byte string label = label.decode("utf-8") except AttributeError: # str doesn't have a decode method pass self.label = label self._refs = refs self._file = dset_file def __len__(self): return len(self._refs) def __getitem__(self, x): return self._file[self._refs[x]] class AstypeContext(object): """ Context manager which allows changing the type read from a dataset. """ # FIXME:ENUM should this allow a conversion from enum base types to values using dictionary? # Probably not, as it would be additional functionality to the h5py interface??? def __init__(self, dset, dtype): self._dset = dset self._dtype = np.dtype(dtype) def __enter__(self): self._dset._astype = self._dtype def __exit__(self, *args): self._dset._astype = None