"""High-level classes for reading HDF5 files."""
from __future__ import annotations
from collections import deque
from collections.abc import Callable
from collections.abc import Mapping, Sequence
from abc import ABC
import os
import posixpath
import warnings
import logging
import numpy as np
from typing import Any, BinaryIO, cast
from typing_extensions import Self # Python 3.10-compat
from pyfive.core import Reference
from pyfive.dataobjects import DataObjects, DatasetID
from pyfive.misc_low_level import SuperBlock
from pyfive.h5py import Datatype
from pyfive.p5t import P5VlenStringType, P5ReferenceType, P5SequenceType
from pyfive.utilities import MetadataBufferingWrapper
logger = logging.getLogger(__name__)
[docs]
class Group(Mapping):
"""
An HDF5 Group which may hold attributes, datasets, or other groups.
Attributes
----------
attrs : dict
Attributes for this group.
name : str
Full path to this group.
file : File
File instance where this group resides.
parent : Group
Group instance containing this group.
"""
def __init__(self, name: str, dataobjects: DataObjects, parent: "Group") -> None:
"""initalize."""
self.parent = parent
self.file = parent.file # type: ignore[has-type]
self.name = name
self._links = dataobjects.get_links()
self._dataobjects = dataobjects
self._attrs = None # cached property
def __repr__(self):
return '<HDF5 group "%s" (%d members)>' % (self.name, len(self))
def __len__(self):
"""Number of links in the group."""
return len(self._links)
def _dereference(self, ref):
"""Dereference a Reference object."""
if not ref:
raise ValueError("cannot deference null reference")
obj = self.file._get_object_by_address(ref.address_of_reference)
if obj is None:
raise ValueError("reference not found in file")
return obj
def __getitem__(self, y):
"""x.__getitem__(y) <==> x[y]."""
return self.__getitem_lazy_control(y, noindex=False)
[docs]
def get_lazy_view(self, y: object) -> object:
"""
This instantiates the object y, and if it is a
chunked dataset, does so without reading the b-tree
index. This is useful for inspecting a variable
that you are not expecting to access. If you know you
want to access the data, and in particular, if you are
going to hand the data to Dask or something else, you
almost certainly want to read the index now, so
just do x[y] rather than x.get_lazy_view(y).
This is a ``pyfive`` extension to the standard h5py API.
"""
return self.__getitem_lazy_control(y, noindex=True)
def __getitem_lazy_control(self, y, noindex):
"""
This is the routine which actually does the get item
but does it in such a way that we control how much laziness
is possible where we have chunked variables with b-trees.
We want to return y, but if y is a chunked dataset we
normally return it with a cached b-tree (noindex=false).
If noindex is True, we do not read the b-tree, and that
will be done when data is first read - which is fine
in a single-threaded environment, but in a parallel
environment you only want to read the index once
(so use noindex=False, which you get via the
normal getitem interface - x[y]).
"""
if isinstance(y, Reference):
return self._dereference(y)
path = posixpath.normpath(y)
if path == ".":
return self
if path.startswith("/"):
return self.file[path[1:]]
if posixpath.dirname(path) != "":
next_obj, additional_obj = path.split("/", 1)
else:
next_obj = path
additional_obj = "."
if next_obj not in self._links:
raise KeyError("%s not found in group" % (next_obj))
obj_name = posixpath.join(self.name, next_obj)
link_target = self._links[next_obj]
if isinstance(link_target, str):
try:
return self.__getitem__(link_target)
except KeyError:
return None
logger.info(
f"[pyfive] Accessing object '{obj_name}' with link target {link_target} (lazy access: {noindex})"
)
dataobjs = self.file._get_dataobjects(link_target)
if dataobjs.is_dataset:
if additional_obj != ".":
raise KeyError("%s is a dataset, not a group" % (obj_name))
return Dataset(obj_name, DatasetID(dataobjs, noindex=noindex), self)
try:
# if true, this may well raise a NotImplementedError, if so, we need
# to warn the user, who may be able to use other parts of the data.
is_datatype = dataobjs.is_datatype
except NotImplementedError as e:
warnings.warn(
f"Found datatype {obj_name} but pyfive cannot read this data: {e}"
)
is_datatype = True
if is_datatype:
return Datatype(obj_name, self.file, dataobjs.ptype)
else:
return Group(obj_name, dataobjs, self)[additional_obj]
def __iter__(self):
for k in self._links.keys():
yield k
[docs]
def visit(self, func: Callable) -> object:
"""
Recursively visit all names in the group and subgroups.
func should be a callable with the signature:
func(name) -> None or return value
Returning None continues iteration, return anything else stops and
return that value from the visit method.
"""
return self.visititems(lambda name, obj: func(name))
[docs]
def visititems(self, func: Callable, noindex: bool = False) -> object:
"""
Recursively visit all objects in this group and subgroups.
func should be a callable with the signature:
func(name, object) -> None or return value
Returning None continues iteration, return anything else stops and
return that value from the visit method.
Use of the optional noindex=True will ensure that
all operations are not only lazy wrt data, but lazy
wrt to any chunked data indices. This keyword argument is a ``pyfive``
extension to the standard h5py API.
"""
root_name_length = len(self.name)
if not self.name.endswith("/"):
root_name_length += 1
# Use either normal access or lazy access:
if noindex:
# Avoid loading dataset indices
get_obj = self.get_lazy_view
else:
get_obj = self.__getitem__
# Initialize queue using the correct getter
queue = deque(get_obj(k) for k in self._links.keys())
while queue:
obj = queue.popleft()
name = obj.name[root_name_length:] # type: ignore[attr-defined]
ret = func(name, obj)
if ret is not None:
return ret
if isinstance(obj, Group):
queue.extend(obj.values())
return None
@property
def attrs(self):
"""attrs attribute."""
if self._attrs is None:
self._attrs = self._dataobjects.get_attributes()
return self._attrs
[docs]
class File(Group):
"""
Open a HDF5 file.
Note in addition to having file specific methods the File object also
inherit the full interface of **Group**.
File is also a context manager and therefore supports the with statement.
Files opened by the class will be closed after the with block, file-like
object are not closed.
Parameters
----------
filename : str or file-like
Name of file (string or unicode) or file like object which has read
and seek methods which behaved like a Python file object.
mode : str
File open mode (default: "r", read-only).
metadata_buffer_size : int
Size of metadata buffer for S3/remote files in MiB (default: 1MiB).
Larger values reduce network calls but use more memory.
(This is a pyfive extension for optimizing remote file access, ignored for local files.)
Attributes
----------
filename : str
Name of the file on disk, None if not available.
mode : str
String indicating that the file is open readonly ("r").
userblock_size : int
Size of the user block in bytes (currently always 0).
"""
def __init__(
self,
filename: str | BinaryIO | MetadataBufferingWrapper,
mode: str = "r",
metadata_buffer_size: int = 1,
) -> None:
"""initalize."""
if mode != "r":
raise NotImplementedError(
"pyfive only provides support for reading and treats all reads as binary"
)
self._close = False
if hasattr(filename, "read"):
if not hasattr(filename, "seek"):
raise ValueError("File like object must have a seek method")
fh = cast(BinaryIO, filename)
self.filename = getattr(filename, "name", "None")
else:
fh = open(filename, "rb")
self._close = True
self.filename = filename
# Wrap S3 file handles with metadata buffering to reduce network calls
self._fh: BinaryIO | MetadataBufferingWrapper
if isinstance(fh, MetadataBufferingWrapper):
# Already wrapped
self._fh = fh
elif type(fh).__name__ == "S3File" or hasattr(fh, "fs"):
# fsspec file handle - wrap with buffering
# We check for the S3File type by name to avoid a hard dependency on s3fs,
# but also check for an 'fs' attribute which is common in s3fs file-like objects.
# This may yet be too broad, but it is unlikely to cause issues for non-S3 files.
logger.info(
"[pyfive] Detected remote file, enabling metadata buffering (%d MB)",
metadata_buffer_size,
)
self._fh = MetadataBufferingWrapper(fh, buffer_size=metadata_buffer_size)
else:
# Local file or other
# NOTE mypy detects incompatible types:
# str | BytesIO = MetadataBufferingWrapper
self._fh = fh
self._superblock = SuperBlock(self._fh, 0)
self._dataobjects_cache: dict = {}
offset = self._superblock.offset_to_dataobjects
dataobjects = self._get_dataobjects(offset)
self.file = self
self.mode = "r"
self.userblock_size = 0
super(File, self).__init__("/", dataobjects, self)
@property
def consolidated_metadata(self) -> bool:
"""Returns True if all B-tree nodes for chunked datasets are located before the first chunk in the file."""
is_consolidated = True
f = self
# for all chunked datasets, check if all btree nodes are located before any dataset chunk
max_btree, min_chunk = None, None
for ds in f:
if isinstance(f[ds], Dataset):
if f[ds].id.layout_class == 2:
if max_btree is None or f[ds].id.btree_range[1] > max_btree:
max_btree = f[ds].id.btree_range[1]
if min_chunk is None or f[ds].id.first_chunk < min_chunk:
min_chunk = f[ds].id.first_chunk
if max_btree is not None and min_chunk is not None:
is_consolidated = max_btree < min_chunk
return is_consolidated
def __repr__(self) -> str:
return '<HDF5 file "%s" (mode r)>' % (os.path.basename(self.filename))
def _get_dataobjects(self, obj_addr):
"""Return cached DataObjects for an object header address."""
cached = self._dataobjects_cache.get(obj_addr)
if cached is not None:
return cached
dataobjects = DataObjects(self._fh, obj_addr)
self._dataobjects_cache[obj_addr] = dataobjects
return dataobjects
def _get_object_by_address(self, obj_addr: BinaryIO) -> Self | Any | None: # type: ignore[return]
"""Return the object pointed to by a given address."""
if self._dataobjects.offset == obj_addr:
return self
queue = deque([(self.name.rstrip("/"), self)])
while queue:
base, grp = queue.popleft()
for name, link_addr in grp._links.items():
full_path = f"{base}/{name}" if base else f"/{name}"
# check address without instantiating
if link_addr == obj_addr:
# return instantiated object
return grp[name]
# descend only if it's a subgroup (need to instantiate minimally)
if self._get_dataobjects(link_addr).is_group:
queue.append((full_path, grp[name]))
[docs]
def close(self):
"""Close the file."""
if self._close:
self._fh.close()
__del__ = close
def __enter__(self):
return self
def __exit__(self, exc_type, value, traceback):
self.close()
[docs]
class Dataset(ABC):
"""
A HDF5 Dataset containing an n-dimensional array and meta-data attributes.
Attributes
----------
shape : tuple
Dataset dimensions.
dtype : dtype
Dataset's type.
size : int
Total number of elements in the dataset.
chunks : tuple or None
Chunk shape, or NOne is chunked storage not used.
compression : str or None
Compression filter used on dataset. None if compression is not enabled
for this dataset.
compression_opts : dict or None
Options for the compression filter.
scaleoffset : dict or None
Setting for the HDF5 scale-offset filter, or None if scale-offset
compression is not used for this dataset.
shuffle : bool
Whether the shuffle filter is applied for this dataset.
fletcher32 : bool
Whether the Fletcher32 checksumming is enabled for this dataset.
fillvalue : float or None
Value indicating uninitialized portions of the dataset. None is no fill
values has been defined.
dim : int
Number of dimensions.
dims : None
Dimension scales.
attrs : dict
Attributes for this dataset.
name : str
Full path to this dataset.
file : File
File instance where this dataset resides.
parent : Group
Group instance containing this dataset.
"""
# Dataset is now an ABC to allow for the possibility of other dataset types, which
# may not support all of the same attributes or methods. Examples already include
# the ppfive Datastet class, which does not support accessing HDF5, but does
# support the same interface for accessing data and attributes in pp/fields files.
def __init__(self, name: str, datasetid: DatasetID, parent: Group) -> None:
"""initalize."""
self.parent = parent
self.file = parent.file
self.name = name
self._attrs = None
self._astype = None
self.id = datasetid
""" This is the DatasetID instance which provides the actual data access methods. """
# horrible kludge for now,
# https://github.com/NCAS-CMS/pyfive/issues/13#issuecomment-2557121461
# we hide stuff we need here
self._dataobjects = self.id._meta
def __repr__(self):
info = (os.path.basename(self.name), self.shape, self.dtype)
return '<HDF5 dataset "%s": shape %s, type "%s">' % info
def __getitem__(self, args):
data = self.id.get_data(args, self.fillvalue)
if self._astype is None:
return data
return data.astype(self._astype)
[docs]
def read_direct(
self,
array: np.ndarray,
source_sel: None | tuple = None,
dest_sel: None | tuple = None,
) -> None:
"""
Read from a HDF5 dataset directly into a NumPy array.
This is equivalent to dset[source_sel] = arr[dset_sel].
Creation of intermediates is not avoided. This method if provided from
compatibility with h5py, it is not efficient.
"""
array[dest_sel] = self[source_sel]
[docs]
def astype(self, dtype: str) -> AstypeContext:
"""
Return a context manager which returns data as a particular type.
Conversion is handled by NumPy after reading extracting the data.
"""
return AstypeContext(self, dtype)
[docs]
def len(self):
"""Return the size of the first axis."""
return self.shape[0]
[docs]
def iter_chunks(self, sel=()):
return self.id.iter_chunks(sel)
@property
def shape(self):
"""shape attribute."""
return self.id.shape
@property
def maxshape(self):
"""maxshape attribute. (None for unlimited dimensions)"""
return self.id._meta.maxshape
@property
def ndim(self):
"""number of dimensions."""
return len(self.shape)
@property
def dtype(self):
"""dtype attribute."""
return self.id.dtype
@property
def value(self):
"""alias for dataset[()]."""
DeprecationWarning(
"dataset.value has been deprecated. Use dataset[()] instead."
)
return self[()]
@property
def size(self):
"""size attribute."""
return np.prod(self.shape)
@property
def chunks(self):
"""chunks attribute."""
return self.id.chunks
@property
def compression(self):
"""compression attribute."""
return self.id._meta.compression
@property
def compression_opts(self):
"""compression_opts attribute."""
return self.id._meta.compression_opts
@property
def scaleoffset(self):
"""scaleoffset attribute."""
return None # TODO support scale-offset filter
@property
def shuffle(self):
"""shuffle attribute."""
return self.id._meta.shuffle
@property
def fletcher32(self):
"""fletcher32 attribute."""
return self.id._meta.fletcher32
@property
def fillvalue(self):
"""fillvalue attribute."""
return self.id._meta.fillvalue
@property
def dims(self):
"""dims attribute."""
return DimensionManager(self)
@property
def attrs(self):
"""attrs attribute."""
return self.id._meta.attributes
@property
def __orthogonal_indexing__(self):
"""Flag to indicate whether indexing is orthogonal.
In general, the flag will be `True` if:
* The data is chunked.
* The data is contiguous and memory mapped access is not
available.
"""
if self.id.chunks is not None:
# Chunked data indexed with
# `DatasetID._get_selection_via_chunks`
return True
if (
not (
isinstance(
self.id._ptype, (P5ReferenceType, P5VlenStringType, P5SequenceType)
)
)
and not self.id.posix
):
# Contiguous data indexed with
# `DatasetID._get_direct_from_contiguous`
return True
# All other cases
return False
class DimensionManager(Sequence):
"""Represents a collection of dimensions associated with a dataset."""
def __init__(self, dset):
ndim = len(dset.shape)
dim_list = [[]] * ndim
if "DIMENSION_LIST" in dset.attrs:
dim_list = dset.attrs["DIMENSION_LIST"]
dim_labels = [b""] * ndim
if "DIMENSION_LABELS" in dset.attrs:
dim_labels = dset.attrs["DIMENSION_LABELS"]
self._dims = [
DimensionProxy(dset.file, label, refs)
for label, refs in zip(dim_labels, dim_list)
]
def __len__(self):
return len(self._dims)
def __getitem__(self, x):
return self._dims[x]
class DimensionProxy(Sequence):
"""Represents a HDF5 "dimension"."""
def __init__(self, dset_file, label, refs):
try:
# decode a byte string
label = label.decode("utf-8")
except AttributeError:
# str doesn't have a decode method
pass
self.label = label
self._refs = refs
self._file = dset_file
def __len__(self):
return len(self._refs)
def __getitem__(self, x):
return self._file[self._refs[x]]
class AstypeContext(object):
"""
Context manager which allows changing the type read from a dataset.
"""
# FIXME:ENUM should this allow a conversion from enum base types to values using dictionary?
# Probably not, as it would be additional functionality to the h5py interface???
def __init__(self, dset, dtype):
self._dset = dset
self._dtype = np.dtype(dtype)
def __enter__(self):
self._dset._astype = self._dtype
def __exit__(self, *args):
self._dset._astype = None