import numpy as np
from collections import namedtuple
from operator import mul
from pyfive.indexing import OrthogonalIndexer, ZarrArrayStub
from pyfive.btree import BTreeV1RawDataChunks
from pyfive.core import Reference, UNDEFINED_ADDRESS
from pyfive.misc_low_level import (
get_vlen_string_data_contiguous,
get_vlen_string_data_from_chunk,
_decode_array,
dtype_replace_refs_with_object,
)
from pyfive.p5t import P5CompoundType, P5VlenStringType, P5ReferenceType, P5SequenceType
from io import UnsupportedOperation
from time import time
import os
import struct
import logging
from importlib.metadata import version
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
StoreInfo = namedtuple("StoreInfo", "chunk_offset filter_mask byte_offset size")
ChunkIndex = namedtuple("ChunkIndex", "chunk_address chunk_dims")
class ChunkRead:
"""
Mixin providing parallel and bulk chunk-reading strategies.
``DatasetID`` inherits from this class so that the hot path in
``_get_selection_via_chunks`` can dispatch to the best available I/O strategy:
* **Case A - fsspec ``cat_ranges``**: a single bulk request issued to the
filesystem; ideal for remote stores (S3, GCS, https, …).
* **Case B - ``os.pread`` thread pool**: parallel POSIX reads sharing a
single file descriptor without seek contention.
* **Case C - serial fallback**: safe for in-memory buffers and any
custom file-like wrapper.
"""
# ------------------------------------------------------------------ #
# Shared helpers #
# ------------------------------------------------------------------ #
def set_parallelism(
self, thread_count=0, cat_range_allowed=True, btree_parallel=False
):
"""
Configure chunk-read parallelism.
``thread_count`` controls POSIX threaded reads via ``os.pread``:
- ``0`` disables threaded reads
- ``>0`` enables threaded reads with that many workers
- Default 4
``cat_range_allowed`` enables fsspec bulk reads via ``cat_ranges``
for compatible non-posix file handles. Default True
``btree_parallel`` enables parallel reads for b-tree nodes when building
the chunk index. Default False.
This is a ``pyfive`` API extension. It is recommended to enable it when working
with remote files, but it may not be suitable for local files. Hence defaults
are that cat_ranges is on (for remote files) and threads are off (for local files).
"""
if thread_count is None:
thread_count = 0
thread_count = int(thread_count)
if thread_count < 0:
raise ValueError("thread_count must be >= 0")
self._thread_count = thread_count
self._cat_range_allowed = cat_range_allowed
self._btree_parallel = btree_parallel
logger.info(
"Parallelism: thread_count=%d, cat_range_allowed=%s, btree_parallel=%s",
self._thread_count,
self._cat_range_allowed,
self._btree_parallel,
)
def _get_required_chunks(self, indexer):
"""
Walk *indexer* and return a list of
``(chunk_coords, chunk_selection, out_selection, storeinfo)``
tuples for every chunk needed to satisfy the selection.
"""
result = []
for chunk_coords, chunk_selection, out_selection in indexer:
chunk_coords = tuple(map(mul, chunk_coords, self.chunks))
storeinfo = self._index[chunk_coords]
result.append((chunk_coords, chunk_selection, out_selection, storeinfo))
return result
def _decode_chunk(self, chunk_buffer, filter_mask, dtype):
"""
Apply the filter pipeline (if any) and return a shaped ndarray
"""
if self.filter_pipeline is not None:
chunk_buffer = BTreeV1RawDataChunks._filter_chunk(
chunk_buffer,
filter_mask,
self.filter_pipeline,
self.dtype.itemsize,
)
return np.frombuffer(chunk_buffer, dtype=dtype).reshape(
self.chunks, order=self._order
)
def _select_chunks(self, indexer, out, dtype):
"""
Collect required chunks and dispatch I/O to the best strategy.
Called by ``_get_selection_via_chunks`` in place of the serial loop.
"""
chunks = self._get_required_chunks(indexer)
if not chunks:
return
# Case A: fsspec - bulk parallel fetch via cat_ranges
if not self.posix and self._cat_range_allowed:
fh = self._fh
actual_fh = getattr(fh, "fh", fh) # support wrapped file-like objects
if hasattr(actual_fh, "fs") and hasattr(actual_fh.fs, "cat_ranges"):
logger.info(
f"[pyfive] chunk read strategy: fsspec_cat_ranges ({len(chunks)} chunks)"
)
self._read_bulk_fsspec(fh, chunks, out, dtype)
return
# Case B: POSIX - thread-parallel reads via os.pread
if self.posix and hasattr(os, "pread") and self._thread_count != 0:
logger.info(
"[pyfive] chunk read strategy: posix_pread_threads workers=%s (%d chunks)",
self._thread_count,
len(chunks),
)
self._read_parallel_threads(chunks, out, dtype)
return
# Case C: serial fallback
logger.info("[pyfive] chunk read strategy: serial (%d chunks)", len(chunks))
self._read_serial(chunks, out, dtype)
# ------------------------------------------------------------------ #
# Strategy implementations #
# ------------------------------------------------------------------ #
def _read_serial(self, chunks, out, dtype):
"""
Read one chunk at a time (safe for any file-like object).
"""
fh = self._fh
for _coords, chunk_sel, out_sel, storeinfo in chunks:
fh.seek(storeinfo.byte_offset)
chunk_buffer = fh.read(storeinfo.size)
out[out_sel] = self._decode_chunk(
chunk_buffer, storeinfo.filter_mask, dtype
)[chunk_sel]
if self.posix:
fh.close()
def _read_parallel_threads(self, chunks, out, dtype):
"""
Thread-parallel read via ``os.pread``.
``os.pread`` does not advance the file-position pointer, so all
worker threads share a single open file descriptor safely.
"""
fh = open(self._filename, "rb")
fd = fh.fileno()
def _read_one(item):
_coords, chunk_sel, out_sel, storeinfo = item
return (
chunk_sel,
out_sel,
storeinfo.filter_mask,
os.pread(fd, storeinfo.size, storeinfo.byte_offset),
)
try:
with ThreadPoolExecutor(max_workers=self._thread_count) as executor:
results = list(executor.map(_read_one, chunks))
finally:
fh.close()
logger.info(
"pyfive thread pool read completed using %d threads", self._thread_count
)
for chunk_sel, out_sel, filter_mask, chunk_buffer in results:
out[out_sel] = self._decode_chunk(chunk_buffer, filter_mask, dtype)[
chunk_sel
]
def _read_bulk_fsspec(self, fh, chunks, out, dtype):
"""
Bulk read via ``fsspec`` ``cat_ranges``.
Issues a single pipelined request for all required byte-ranges,
which on object stores typically translates to a small number of
HTTP range requests rather than one round-trip per chunk
(reaching through the MetadataBufferingWrapper).
"""
actual_fh = getattr(fh, "fh", fh) # support wrapped file-like objects
path = actual_fh.path
starts = [si.byte_offset for _, _, _, si in chunks]
stops = [si.byte_offset + si.size for _, _, _, si in chunks]
buffers = actual_fh.fs.cat_ranges([path] * len(chunks), starts, stops)
for (_coords, chunk_sel, out_sel, storeinfo), chunk_buffer in zip(
chunks, buffers
):
out[out_sel] = self._decode_chunk(
chunk_buffer, storeinfo.filter_mask, dtype
)[chunk_sel]
[docs]
class DatasetID(ChunkRead):
"""
Implements an "HDF5 dataset identifier", which despite the name, actually
represents the data of a dataset in a file, and not an identifier. It includes all
the low level methods for working with chunked data, lazily or not.
This class has been deliberately implemented in such as way so as to cache all
the relevant metadata, so that once you have an instance,
it is completely independent of the parent file, and it can be used
efficiently in distributed threads without thread contention to the b-tree etc.
*This behaviour may differ from* ``h5py``, *which cannot isolate the dataset access
from the parent file access as both share underlying C-structures.*
"""
def __init__(
self,
dataobject: "DataObjects", # type: ignore[name-defined] # noqa: F821
noindex: bool = False,
pseudo_chunking_size_MB: int = 4,
) -> None:
"""
Instantiated with the ``pyfive`` ``datasetdataobject``, we copy and cache everything
we want so that the only file operations are now data accesses.
noindex provides a method for controlling how lazy the data load
actually is. This version supports values of False (normal behaviour
index is read when datasetid first instantiated) or True (index
is only read when the data is accessed).
if ``pseudo_chunking_size_MB`` is set to a value greater than zero, and
if the storage is not local posix (and hence ``np.mmap``is not available) then
when accessing contiguous variables, we attempt to find a suitable
chunk shape to approximate that volume and read the contigous variable
as if were chunked. This is to facilitate lazy loading of partial data
from contiguous storage.
(Currently the only way to change this value is by explicitly using
the ``set_pseudo_chunk_size method``. Most users will not need to change
it.)
"""
self._order = dataobject.order
fh = dataobject.fh
try:
# See if 'fh' is an underlying file descriptor
fh.fileno()
except (AttributeError, OSError):
# No file descriptor => Not Posix
self.posix = False
self.__fh = fh
self.pseudo_chunking_size = pseudo_chunking_size_MB * 1024 * 1024
try:
# maybe this is an S3File instance?
self._filename = getattr(fh, "path")
except:
# maybe a remote https file opened as bytes?
# failing that, maybe a memory file, return as None
# or even a Pyfive Dataset instance
self._filename = getattr(fh, "full_name", "None")
if self._filename == "None":
fh = getattr(fh, "fh", None)
if fh is not None:
self._filename = fh.path
else:
# Has a file descriptor => Posix
self.posix = True
self._filename = fh.name
self.pseudo_chunking_size = 0
self.filter_pipeline = dataobject.filter_pipeline
self.shape = dataobject.shape
self.rank = len(self.shape)
self.chunks = dataobject.chunks
self.set_parallelism()
# experimental code. We need to find out whether or not this
# is unnecessary duplication. At the moment it seems best for
# each variable to have it's own copy of those needed for
# data access. Though that's clearly not optimal if they include
# other data. To be determined.
self._global_heaps: dict = {}
self._msg_offset, self.layout_class, self.property_offset = (
dataobject.get_id_storage_params()
)
self._unique = (self._filename, self.shape, self._msg_offset)
self._ptype = dataobject.ptype
self._meta = DatasetMeta(dataobject)
self._index = None
self.__index_built = False
self._index_params = None
# throws a flake8 wobbly for Python<3.10; match is Py3.10+ syntax
match self.layout_class: # noqa
case 0: # compact storage
self._data = self._get_compact_data(dataobject)
case 1: # contiguous storage
(self.data_offset,) = struct.unpack_from(
"<Q", dataobject.msg_data, self.property_offset
)
case 2: # chunked storage
self._index_params = ChunkIndex(
dataobject._chunk_address, dataobject._chunk_dims
)
if not noindex:
self._build_index()
def __hash__(self):
"""The hash is based on assuming the file path, the location
of the data in the file, and the data shape are a unique
combination.
"""
return hash(self._unique)
def __eq__(self, other):
"""
Equality is based on the filename, location of the data in the file
and the shape of the data.
"""
return self._unique == other._unique
def __chunk_init_check(self):
"""
Used by all the chunk methods to see if this dataset is
chunked, and if so, if the index is present, and if not,
build it. Otherwise handle errors etc.
"""
if self.layout_class != 2:
raise TypeError("Dataset is not chunked ")
return not self.index == {}
[docs]
def get_chunk_info(self, index):
"""
Retrieve storage information about a chunk specified by its index.
"""
if self.__chunk_init_check():
return self._index[self._nthindex[index]]
else:
raise TypeError("Dataset is not chunked ")
[docs]
def get_chunk_info_by_coord(self, coordinate_index):
"""
Retrieve information about a chunk specified by the array address of the chunk’s
first element in each dimension.
"""
if self.__chunk_init_check():
return self._index[coordinate_index]
else:
raise TypeError("Dataset is not chunked ")
[docs]
def get_num_chunks(self):
"""
Return total number of chunks in dataset
"""
if self.__chunk_init_check():
return len(self._index)
else:
return 0
[docs]
def read_direct_chunk(self, chunk_position, **kwargs):
"""
Returns a tuple containing the filter_mask and the raw data storing this chunk as bytes.
Additional arguments supported by ``h5py`` are not supported here.
"""
if not self.__chunk_init_check():
raise TypeError("Dataset is not chunked ")
if chunk_position not in self._index:
raise OSError("Chunk coordinates must lie on chunk boundaries")
storeinfo = self._index[chunk_position]
return storeinfo.filter_mask, self._get_raw_chunk(storeinfo)
[docs]
def get_data(self, args, fillvalue):
"""Called by the dataset getitem method"""
# throws a flake8 wobbly for Python<3.10; match is Py3.10+ syntax
no_storage = False
match self.layout_class: # noqa
case 0: # compact storage
if self._data is None:
no_storage = True
else:
return self._read_compact_data(args)
case 1: # contiguous storage
if self.data_offset == UNDEFINED_ADDRESS:
no_storage = True
else:
return self._get_contiguous_data(args, fillvalue)
case 2: # chunked storage
if not self.__index_built:
self._build_index()
if not self._index:
no_storage = True
else:
if isinstance(self._ptype, P5ReferenceType):
# references need to read all the chunks for now
return self._get_selection_via_chunks(())[args]
else:
# this is lazily reading only the chunks we need
return self._get_selection_via_chunks(args)
if no_storage:
return np.full(self.shape, fillvalue, dtype=self.dtype)[args]
[docs]
def iter_chunks(self, args):
"""Iterate over chunks in a chunked dataset.
The args argument is a (possibly empty) sequence of indices
that defines the region to be used. If an empty sequence then
the entire dataspace will be used for the iterator.
For each chunk within the given region, the iterator yields a
tuple of indices that gives the intersection of the given
chunk with the selection area. This can be used to read data
in that chunk.
"""
if not self.__chunk_init_check():
return None
def convert_selection(tuple_of_slices):
# while a slice of the form slice(a,b,None) is equivalent
# in function to a slice of form (a,b,1) it is not the same.
# For compatability I've gone for "the same"
def convert_slice(aslice):
if aslice.step is None:
return slice(aslice.start, aslice.stop, 1)
return aslice
return tuple([convert_slice(a) for a in tuple_of_slices])
array = ZarrArrayStub(self.shape, self.chunks)
if args:
# We have implemented what the docstring says it does below,
# but that's not what h5py actually does, and what is it
# actually does is useless, so we haven't implemented that
raise NotImplementedError(
"h5py does something silly, and our implementation does not"
)
indexer = OrthogonalIndexer(args[0], array)
else:
indexer = OrthogonalIndexer(args, array)
for chunk_coords, chunk_selection, out_selection in indexer:
if args:
yield convert_selection(chunk_selection)
else:
yield convert_selection(out_selection)
##### The following property is made available to support ActiveStorage
##### and to help those who may want to generate kerchunk indices and
##### bypass the iterator methods.
@property
def index(self):
"""Direct access to the chunk index, if there is one. This is a ``pyfive`` API extension."""
# can't use init_chunk_check because that would be an infinite regression
if self.layout_class != 2:
raise TypeError("Data is not chunked")
if not self._index:
self._build_index()
return self._index
##### This property is made available to help understand object store performance
@property
def btree_range(self):
"""A tuple with the addresses of the first b-tree node
for this variable, and the address of the furthest away node
(Which may not be the last one in the chunk index). This property
may be of use in understanding the read performance of chunked
data in object stores. ``btree_range`` is a ``pyfive`` API extension.
"""
self.__chunk_init_check()
return (self._btree_start, self._btree_end)
##### This property is made available to help understand object store performance
@property
def first_chunk(self):
"""The integer address of the first data chunk for this variable.
This property may be of use in understanding the read
performance of chunked data in object stores. ``first_chunk``
is a ``pyfive`` API extension.
"""
self.__chunk_init_check()
min_offset = None
for k in self._index:
if min_offset is None or self._index[k].byte_offset < min_offset:
min_offset = self._index[k].byte_offset
return min_offset
#### The following method can be used to set pseudo chunking size after the
#### file has been closed and before data transactions. This is pyfive specific
[docs]
def set_pseudo_chunk_size(self, newsize_MB):
"""Set pseudo chunking size for contiguous variables.
This is a ``pyfive`` API extension.
The default value is 4 MB which should be suitable for most applications.
For arrays smaller than this value, no pseudo chunking is used.
Larger arrays will be accessed in in roughly ``newsize_MB`` reads."""
if self.layout_class == 1:
if not self.posix:
self.pseudo_chunking_size = newsize_MB * 1024 * 1024
else:
pass # silently ignore it, we'll be using a np.memmap
else:
raise ValueError("Attempt to set pseudo chunking on non-contigous variable")
[docs]
def get_chunk_info_from_chunk_coord(self, chunk_coords):
"""
Retrieve storage information about a chunk specified by its index.
This is a ``pyfive`` API extension.
This index is in chunk space (as used by ``zarr``) and needs to be converted
to HDF5 coordinate space.
Additionally, if this file is not chunked, the storeinfo
is returned for the contiguous data as if it were one chunk.
"""
if not self._index:
dummy = StoreInfo(
None, None, self.data_offset, self.dtype.itemsize * np.prod(self.shape)
)
return dummy
else:
coord_index = tuple(map(mul, chunk_coords, self.chunks))
return self.get_chunk_info_by_coord(coord_index)
######
# The following DatasetID methods are used by PyFive and you wouldn't expect
# third parties to use them. They are not H5Py methods.
######
def _build_index(self):
"""
Build the chunk index if it doesn't exist. This is only
called for chunk data, and only when the variable is accessed.
That is, it is not called when we an open a file, or when
we list the variables in a file, but only when we do
``v = open_file['var_name']`` where ``var_name`` is chunked.
"""
if self._index is not None:
return
if self._index_params is None:
raise RuntimeError("Attempt to build index with no chunk index parameters")
# look out for an empty dataset, which will have no btree
if (
np.prod(self.shape) == 0
or self._index_params.chunk_address == UNDEFINED_ADDRESS
):
self._index = {}
# FIXME: There are other edge cases for self._index = {} to handle
self._btree_end, self._btree_start = None, None
return
logger.info(
"[pyfive] Building chunk index (pyfive version=%s)",
version("pyfive"),
)
# FIXME: How do we know it's a V1 B-tree?
# There are potentially five different chunk indexing options according to
# https://docs.hdfgroup.org/archive/support/HDF5/doc/H5.format.html#AppendixC
t0 = time()
fh = self._fh
chunk_btree = BTreeV1RawDataChunks(
fh,
self._index_params.chunk_address,
self._index_params.chunk_dims,
fetch_fn=self._make_btree_fetch_fn(),
)
if self.posix:
fh.close()
self._index = {}
self._nthindex = []
for node in chunk_btree.all_nodes[0]:
for node_key, addr in zip(node["keys"], node["addresses"]):
start = node_key["chunk_offset"][:-1]
key = start
size = node_key["chunk_size"]
filter_mask = node_key["filter_mask"]
self._nthindex.append(key)
self._index[key] = StoreInfo(key, filter_mask, addr, size)
self._btree_start = chunk_btree.offset
self._btree_end = chunk_btree.last_offset
t1 = time() - t0
if t1 < 1.0:
elapsed = f"{t1 * 1000:.0f}ms"
else:
elapsed = f"{t1:.1f}s"
logger.info(
"[pyfive] Chunk index built: btree range=%s; elapsed=%s",
(self._btree_start, self._btree_end),
elapsed,
)
self.__index_built = True
def _make_btree_fetch_fn(self):
"""
Return fetch_fn(addresses, size) for b-tree leaf reads, or None.
The default here is None (self._btree_parallel is None),
which means that b-tree nodes will be read serially via the file handle's read method.
"""
if not self._btree_parallel:
return None
actual_fh = None
if not self.posix:
fh = self._fh
actual_fh = getattr(fh, "fh", fh)
# Case A: fsspec cat_ranges for remote file-like handles.
if self._cat_range_allowed:
fs = getattr(actual_fh, "fs", None)
path = getattr(actual_fh, "path", None)
if fs is not None and path is not None and hasattr(fs, "cat_ranges"):
def fetch_cat_ranges(addresses, size):
stops = [addr + size for addr in addresses]
return fs.cat_ranges([path] * len(addresses), addresses, stops)
return fetch_cat_ranges
# Case B: POSIX thread pool with os.pread.
if self.posix and hasattr(os, "pread") and self._thread_count != 0:
filename = self._filename
thread_count = self._thread_count
def fetch_pread(addresses, size):
fh_local = open(filename, "rb")
fd = fh_local.fileno()
try:
with ThreadPoolExecutor(max_workers=thread_count) as executor:
return list(
executor.map(
lambda addr: os.pread(fd, size, addr), addresses
)
)
finally:
fh_local.close()
return fetch_pread
# Case C: serial fallback in BTreeV1RawDataChunks.
return None
def _get_contiguous_data(self, args, fillvalue):
if isinstance(self._ptype, P5ReferenceType):
size = self._ptype.size
if size != 8:
raise NotImplementedError(f"Unsupported Reference type - size {size}")
fh = self._fh
ref_addresses = np.memmap(
fh,
dtype=("<u8"),
mode="c",
offset=self.data_offset,
shape=self.shape,
order=self._order,
)
result = np.array([Reference(addr) for addr in ref_addresses])[args]
if self.posix:
fh.close()
return result
elif isinstance(self._ptype, P5VlenStringType):
fh = self._fh
array = get_vlen_string_data_contiguous(
fh,
self.data_offset,
self._global_heaps,
self.shape,
self._ptype,
fillvalue,
)
if self.posix:
fh.close()
return array.reshape(self.shape, order=self._order)[args]
elif isinstance(self._ptype, P5SequenceType):
raise NotImplementedError(
f"datatype not implemented - {self._ptype.__class__.__name__}"
)
if not self.posix:
# Not posix
return self._get_direct_from_contiguous(args)
else:
# posix
try:
# Create a memory-map to the stored array, which
# means that we will end up only copying the
# sub-array into in memory.
fh = self._fh
view = np.memmap(
fh,
dtype=self.dtype,
mode="c",
offset=self.data_offset,
shape=self.shape,
order=self._order,
)
# Create the sub-array
result = view[args]
# Copy the data from disk to physical memory
result = result.view(type=np.ndarray)
if not self._ptype.is_atomic:
# if we have a type which is not atomic
# we have to get a view
result = result.view(self.dtype)
# and for compounds we have to wrap any References properly
# todo: check for Enum etc types
if isinstance(self._ptype, P5CompoundType):
new_dtype = dtype_replace_refs_with_object(self.dtype)
new_array = np.empty(result.shape, dtype=new_dtype)
new_array[:] = result
result = _decode_array(result, new_array)
fh.close()
return result
except UnsupportedOperation:
return self._get_direct_from_contiguous(args)
def _get_compact_data(self, dataobject):
data = None
layout = None
for msg in dataobject.msgs:
if msg["type"] == 8:
layout = msg
break
if layout is None:
raise ValueError("No layout message in compact dataset?")
byts = dataobject.msg_data[
msg["offset_to_message"] : msg["offset_to_message"] + msg["size"]
]
layout_version = byts[0]
if layout_version == 1 or layout_version == 2:
raise NotImplementedError("Compact layout v1 and v2.")
elif layout_version == 3 or layout_version == 4:
size = int.from_bytes(byts[2:4], "little")
data = byts[4 : 4 + size]
else:
raise ValueError("Unknown layout version.")
return data
def _read_compact_data(self, args):
view = np.frombuffer(
self._data,
dtype=self.dtype,
).reshape(self.shape)
# Create the sub-array
result = view[args]
return result
def _get_direct_from_contiguous(self, args=None):
"""
This is a fallback situation if we can't use a memory map which would otherwise be lazy.
If pseudo_chunking_size is set, we attempt to read the contiguous data in chunks
otherwise we have to read the entire array. This is a fallback situation if we
can't use a memory map which would otherwise be lazy. This will normally be when
we don't have a true Posix file. We should never end up here with compressed
data.
"""
def __get_pseudo_shape():
"""Determine an appropriate chunk and stride for a given pseudo chunk size"""
element_size = self.dtype.itemsize
chunk_shape = np.copy(self.shape)
while True:
chunk_size = np.prod(chunk_shape) * element_size
if chunk_size < self.pseudo_chunking_size:
break
for i in range(len(chunk_shape)):
if chunk_shape[i] > 1:
chunk_shape[i] //= 2
break
return chunk_shape, chunk_size
class LocalOffset:
def __init__(self, shape, chunk_shape, stride):
chunks_per_dim = [
int(np.ceil(a / c)) for a, c in zip(shape, chunk_shape)
]
self.chunk_strides = np.cumprod([1] + chunks_per_dim[::-1])[:-1][::-1]
self.stride = stride
def coord_to_offset(self, chunk_coords):
linear_offset = sum(
idx * stride
for idx, stride in zip(chunk_coords, self.chunk_strides)
)
return linear_offset * self.stride
fh = self._fh
if self.pseudo_chunking_size:
chunk_shape, stride = __get_pseudo_shape()
stride = int(stride)
offset_finder = LocalOffset(self.shape, chunk_shape, stride)
array = ZarrArrayStub(self.shape, chunk_shape)
indexer = OrthogonalIndexer(args, array)
out_shape = indexer.shape
out = np.empty(out_shape, dtype=self.dtype, order=self._order)
chunk_size = np.prod(chunk_shape)
for chunk_coords, chunk_selection, out_selection in indexer:
index = self.data_offset + offset_finder.coord_to_offset(chunk_coords)
index = int(index)
fh.seek(index)
chunk_buffer = fh.read(stride)
chunk_data = np.frombuffer(chunk_buffer, dtype=self.dtype).copy()
if len(chunk_data) < chunk_size:
# last chunk over end of file
padded_chunk_data = np.zeros(chunk_size, dtype=self.dtype)
padded_chunk_data[: len(chunk_data)] = chunk_data
chunk_data = padded_chunk_data
out[out_selection] = chunk_data.reshape(chunk_shape, order=self._order)[
chunk_selection
]
if self.posix:
fh.close()
return out
else:
itemsize = self.dtype.itemsize
num_elements = np.prod(self.shape, dtype=int)
num_bytes = num_elements * itemsize
# we need it all, let's get it all (i.e. this really does
# read the lot)
fh.seek(self.data_offset)
chunk_buffer = fh.read(num_bytes)
chunk_data = np.frombuffer(chunk_buffer, dtype=self.dtype).copy()
chunk_data = chunk_data.reshape(self.shape, order=self._order)
chunk_data = chunk_data[args]
if self.posix:
fh.close()
return chunk_data
def _get_raw_chunk(self, storeinfo):
"""
Obtain the bytes associated with a chunk.
"""
fh = self._fh
fh.seek(storeinfo.byte_offset)
out = fh.read(storeinfo.size)
if self.posix:
fh.close()
return out
def _get_selection_via_chunks(self, args):
"""Use the zarr orthogonal indexer to extract data for a specfic
selection within the dataset array and in doing so, only load
the relevant chunks.
"""
if self._index is None:
raise RuntimeError("Attempt to read chunked data with no index")
# need a local dtype as we may override it for a reference read.
dtype = self.dtype
if isinstance(self._ptype, P5ReferenceType):
# this is a reference and we're returning that
size = self._ptype.size
dtype = "<u8"
if size != 8:
raise NotImplementedError("Unsupported Reference type")
else:
if np.prod(self.shape) == 0:
return np.zeros(self.shape)
array = ZarrArrayStub(self.shape, self.chunks)
indexer = OrthogonalIndexer(args, array)
out_shape = indexer.shape
out = np.empty(out_shape, dtype=dtype, order=self._order)
if isinstance(self._ptype, P5VlenStringType):
fh = self._fh
chunk_shape = self.chunks
global_heaps = self._global_heaps
index = self._index
for chunk_coords, chunk_selection, out_selection in indexer:
chunk_coords = tuple(map(mul, chunk_coords, self.chunks))
chunk_data = get_vlen_string_data_from_chunk(
fh,
index[chunk_coords].byte_offset,
global_heaps,
chunk_shape,
self._ptype,
)
chunk_data = chunk_data.reshape(chunk_shape)
out[out_selection] = chunk_data[chunk_selection]
if self.posix:
fh.close()
else:
self._select_chunks(indexer, out, dtype)
if isinstance(self._ptype, P5ReferenceType):
to_reference = np.vectorize(Reference)
out = to_reference(out)
return out
@property
def _fh(self):
"""Return an open file handle to the parent file.
When the parent file has been closed, we will need to reopen it
to continue to access data. This facility is provided to support
thread safe data access. However, now the file is open outside
a context manager, the user is responsible for closing it,
though it should get closed when the variable instance is
garbage collected.
"""
if self.posix:
# Posix: Open the file, without caching it.
return open(self._filename, "rb")
# Not posix: Use the cached file if it's open, otherwise open
# the file and cache it.
fh = self.__fh
if fh.closed:
fh = open(self._filename, "rb")
self.__fh = fh
return fh
@property
def dtype(self):
"""
Return numpy dtype of the dataset.
"""
return self._ptype.dtype
[docs]
def get_type(self):
"""
Return pyfive type of the dataset.
"""
return self._ptype
class DatasetMeta:
"""
This is a convenience class to bundle up and cache the metadata
exposed by the Dataset when DatasetId is constructed.
"""
def __init__(self, dataobject):
self.attributes = dataobject.compression
self.maxshape = dataobject.maxshape
self.compression = dataobject.compression
self.compression_opts = dataobject.compression_opts
self.shuffle = dataobject.shuffle
self.fletcher32 = dataobject.fletcher32
self.fillvalue = dataobject.fillvalue
self.attributes = dataobject.get_attributes()
self.datatype = dataobject.ptype
# horrible kludge for now, this isn't really the same sort of thing
# https://github.com/NCAS-CMS/pyfive/issues/13#issuecomment-2557121461
# this is used directly in the Dataset init method.
self.offset = dataobject.offset