Source code for asdf.tags.core.ndarray

import mmap
import sys

import numpy as np
from jsonschema import ValidationError
from numpy import ma

from ... import util
from ...types import AsdfType

_datatype_names = {
    "int8": "i1",
    "int16": "i2",
    "int32": "i4",
    "int64": "i8",
    "uint8": "u1",
    "uint16": "u2",
    "uint32": "u4",
    "uint64": "u8",
    "float32": "f4",
    "float64": "f8",
    "complex64": "c8",
    "complex128": "c16",
    "bool8": "b1",
}


_string_datatype_names = {"ascii": "S", "ucs4": "U"}


def asdf_byteorder_to_numpy_byteorder(byteorder):
    if byteorder == "big":
        return ">"
    elif byteorder == "little":
        return "<"
    raise ValueError(f"Invalid ASDF byteorder '{byteorder}'")


def asdf_datatype_to_numpy_dtype(datatype, byteorder=None):
    if byteorder is None:
        byteorder = sys.byteorder
    if isinstance(datatype, str) and datatype in _datatype_names:
        datatype = _datatype_names[datatype]
        byteorder = asdf_byteorder_to_numpy_byteorder(byteorder)
        return np.dtype(str(byteorder + datatype))
    elif (
        isinstance(datatype, list)
        and len(datatype) == 2
        and isinstance(datatype[0], str)
        and isinstance(datatype[1], int)
        and datatype[0] in _string_datatype_names
    ):
        length = datatype[1]
        byteorder = asdf_byteorder_to_numpy_byteorder(byteorder)
        datatype = str(byteorder) + str(_string_datatype_names[datatype[0]]) + str(length)
        return np.dtype(datatype)
    elif isinstance(datatype, dict):
        if "datatype" not in datatype:
            raise ValueError(f"Field entry has no datatype: '{datatype}'")
        name = datatype.get("name", "")
        byteorder = datatype.get("byteorder", byteorder)
        shape = datatype.get("shape")
        datatype = asdf_datatype_to_numpy_dtype(datatype["datatype"], byteorder)
        if shape is None:
            return (str(name), datatype)
        else:
            return (str(name), datatype, tuple(shape))
    elif isinstance(datatype, list):
        datatype_list = []
        for i, subdatatype in enumerate(datatype):
            np_dtype = asdf_datatype_to_numpy_dtype(subdatatype, byteorder)
            if isinstance(np_dtype, tuple):
                datatype_list.append(np_dtype)
            elif isinstance(np_dtype, np.dtype):
                datatype_list.append(("", np_dtype))
            else:
                raise RuntimeError("Error parsing asdf datatype")
        return np.dtype(datatype_list)
    raise ValueError(f"Unknown datatype {datatype}")


def numpy_byteorder_to_asdf_byteorder(byteorder, override=None):
    if override is not None:
        return override

    if byteorder == "=":
        return sys.byteorder
    elif byteorder == "<":
        return "little"
    else:
        return "big"


def numpy_dtype_to_asdf_datatype(dtype, include_byteorder=True, override_byteorder=None):
    dtype = np.dtype(dtype)
    if dtype.names is not None:
        fields = []
        for name in dtype.names:
            field = dtype.fields[name][0]
            d = {}
            d["name"] = name
            field_dtype, byteorder = numpy_dtype_to_asdf_datatype(field, override_byteorder=override_byteorder)
            d["datatype"] = field_dtype
            if include_byteorder:
                d["byteorder"] = byteorder
            if field.shape:
                d["shape"] = list(field.shape)
            fields.append(d)
        return fields, numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder)

    elif dtype.subdtype is not None:
        return numpy_dtype_to_asdf_datatype(dtype.subdtype[0], override_byteorder=override_byteorder)

    elif dtype.name in _datatype_names:
        return dtype.name, numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder)

    elif dtype.name == "bool":
        return "bool8", numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder)

    elif dtype.name.startswith("string") or dtype.name.startswith("bytes"):
        return ["ascii", dtype.itemsize], "big"

    elif dtype.name.startswith("unicode") or dtype.name.startswith("str"):
        return (
            ["ucs4", int(dtype.itemsize / 4)],
            numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder),
        )

    raise ValueError(f"Unknown dtype {dtype}")


def inline_data_asarray(inline, dtype=None):
    # np.asarray doesn't handle structured arrays unless the innermost
    # elements are tuples.  To do that, we drill down the first
    # element of each level until we find a single item that
    # successfully converts to a scalar of the expected structured
    # dtype.  Then we go through and convert everything at that level
    # to a tuple.  This probably breaks for nested structured dtypes,
    # but it's probably good enough for now.  It also won't work with
    # object dtypes, but ASDF explicitly excludes those, so we're ok
    # there.
    if dtype is not None and dtype.fields is not None:

        def find_innermost_match(line, depth=0):
            if not isinstance(line, list) or not len(line):
                raise ValueError("data can not be converted to structured array")
            try:
                np.asarray(tuple(line), dtype=dtype)
            except ValueError:
                return find_innermost_match(line[0], depth + 1)
            else:
                return depth

        depth = find_innermost_match(inline)

        def convert_to_tuples(line, data_depth, depth=0):
            if data_depth == depth:
                return tuple(line)
            else:
                return [convert_to_tuples(x, data_depth, depth + 1) for x in line]

        inline = convert_to_tuples(inline, depth)

        return np.asarray(inline, dtype=dtype)
    else:

        def handle_mask(inline):
            if isinstance(inline, list):
                if None in inline:
                    inline_array = np.asarray(inline)
                    nones = np.equal(inline_array, None)
                    return np.ma.array(np.where(nones, 0, inline), mask=nones)
                else:
                    return [handle_mask(x) for x in inline]
            return inline

        inline = handle_mask(inline)

        inline = np.ma.asarray(inline, dtype=dtype)
        if not ma.is_masked(inline):
            return inline.data
        else:
            return inline


def numpy_array_to_list(array):
    def tolist(x):
        if isinstance(x, (np.ndarray, NDArrayType)):
            if x.dtype.char == "S":
                x = x.astype("U").tolist()
            else:
                x = x.tolist()

        if isinstance(x, (list, tuple)):
            return [tolist(y) for y in x]
        else:
            return x

    def ascii_to_unicode(x):
        # Convert byte string arrays to unicode string arrays, since YAML
        # doesn't handle the former.
        if isinstance(x, list):
            return [ascii_to_unicode(y) for y in x]
        elif isinstance(x, bytes):
            return x.decode("ascii")
        else:
            return x

    result = ascii_to_unicode(tolist(array))

    return result


[docs]class NDArrayType(AsdfType): name = "core/ndarray" version = "1.0.0" supported_versions = {"1.0.0", "1.1.0"} types = [np.ndarray, ma.MaskedArray] def __init__(self, source, shape, dtype, offset, strides, order, mask, asdffile): self._asdffile = asdffile self._source = source self._block = None self._array = None self._mask = mask if isinstance(source, list): self._array = inline_data_asarray(source, dtype) self._array = self._apply_mask(self._array, self._mask) self._block = asdffile.blocks.add_inline(self._array) if shape is not None: if (shape[0] == "*" and self._array.shape[1:] != tuple(shape[1:])) or ( self._array.shape != tuple(shape) ): raise ValueError("inline data doesn't match the given shape") self._shape = shape self._dtype = dtype self._offset = offset self._strides = strides self._order = order if not asdffile.blocks.lazy_load: self._make_array() def _make_array(self): # If the ASDF file has been updated in-place, then there's # a chance that the block's original data object has been # closed and replaced. We need to check here and re-generate # the array if necessary, otherwise we risk segfaults when # memory mapping. if self._array is not None: base = util.get_array_base(self._array) if isinstance(base, np.memmap) and isinstance(base.base, mmap.mmap) and base.base.closed: self._array = None if self._array is None: block = self.block shape = self.get_actual_shape(self._shape, self._strides, self._dtype, len(block)) if block.trust_data_dtype: dtype = block.data.dtype else: dtype = self._dtype self._array = np.ndarray(shape, dtype, block.data, self._offset, self._strides, self._order) self._array = self._apply_mask(self._array, self._mask) if block.readonly: self._array.setflags(write=False) return self._array def _apply_mask(self, array, mask): if isinstance(mask, (np.ndarray, NDArrayType)): # Use "mask.view()" here so the underlying possibly # memmapped mask array is freed properly when the masked # array goes away. array = ma.array(array, mask=mask.view()) # assert util.get_array_base(array.mask) is util.get_array_base(mask) return array elif np.isscalar(mask): if np.isnan(mask): return ma.array(array, mask=np.isnan(array)) else: return ma.masked_values(array, mask) return array def __array__(self): return self._make_array() def __repr__(self): # repr alone should not force loading of the data if self._array is None: return ( f"<{'array' if self._mask is None else 'masked array'} " f"(unloaded) shape: {self._shape} dtype: {self._dtype}>" ) return repr(self._make_array()) def __str__(self): # str alone should not force loading of the data if self._array is None: return ( f"<{'array' if self._mask is None else 'masked array'} " f"(unloaded) shape: {self._shape} dtype: {self._dtype}>" ) return str(self._make_array())
[docs] def get_actual_shape(self, shape, strides, dtype, block_size): """ Get the actual shape of an array, by computing it against the block_size if it contains a ``*``. """ num_stars = shape.count("*") if num_stars == 0: return shape elif num_stars == 1: if shape[0] != "*": raise ValueError("'*' may only be in first entry of shape") if strides is not None: stride = strides[0] else: stride = np.product(shape[1:]) * dtype.itemsize missing = int(block_size / stride) return [missing] + shape[1:] raise ValueError(f"Invalid shape '{shape}'")
@property def block(self): if self._block is None: self._block = self._asdffile.blocks.get_block(self._source) return self._block @property def shape(self): if self._shape is None: return self.__array__().shape if "*" in self._shape: return tuple(self.get_actual_shape(self._shape, self._strides, self._dtype, len(self.block))) return tuple(self._shape) @property def dtype(self): if self._array is None: return self._dtype else: return self._make_array().dtype def __len__(self): if self._array is None: return self._shape[0] else: return len(self._make_array()) def __getattr__(self, attr): # We need to ignore __array_struct__, or unicode arrays end up # getting "double casted" and upsized. This also reduces the # number of array creations in the general case. if attr == "__array_struct__": raise AttributeError() return getattr(self._make_array(), attr) def __setitem__(self, *args): # This workaround appears to be necessary in order to avoid a segfault # in the case that array assignment causes an exception. The segfault # originates from the call to __repr__ inside the traceback report. try: self._make_array().__setitem__(*args) except Exception as e: self._array = None raise e from None def __getattribute__(self, name): # The presence of these attributes on an NDArrayType instance # can cause problems when the array is passed to other # libraries. # See https://github.com/asdf-format/asdf/issues/1015 if name in ("name", "version", "supported_versions"): raise AttributeError(f"'{self.__class__.name}' object has no attribute '{name}'") else: return AsdfType.__getattribute__(self, name)
[docs] @classmethod def from_tree(cls, node, ctx): if isinstance(node, list): return cls(node, None, None, None, None, None, None, ctx) elif isinstance(node, dict): source = node.get("source") data = node.get("data") if source and data: raise ValueError("Both source and data may not be provided at the same time") if data: source = data shape = node.get("shape", None) if data is not None: byteorder = sys.byteorder else: byteorder = node["byteorder"] if "datatype" in node: dtype = asdf_datatype_to_numpy_dtype(node["datatype"], byteorder) else: dtype = None offset = node.get("offset", 0) strides = node.get("strides", None) mask = node.get("mask", None) return cls(source, shape, dtype, offset, strides, "A", mask, ctx) raise TypeError("Invalid ndarray description.")
[docs] @classmethod def reserve_blocks(cls, data, ctx): # Find all of the used data buffers so we can add or rearrange # them if necessary if isinstance(data, np.ndarray): yield ctx.blocks.find_or_create_block_for_array(data, ctx) elif isinstance(data, NDArrayType): yield data.block
[docs] @classmethod def to_tree(cls, data, ctx): # The ndarray-1.0.0 schema does not permit 0 valued strides. # Perhaps we'll want to allow this someday, to efficiently # represent an array of all the same value. if any(stride == 0 for stride in data.strides): data = np.ascontiguousarray(data) # The view computations that follow assume that the base array # is contiguous. If not, we need to make a copy to avoid # writing a nonsense view. base = util.get_array_base(data) if not base.flags.forc: data = np.ascontiguousarray(data) base = util.get_array_base(data) shape = data.shape block = ctx.blocks.find_or_create_block_for_array(data, ctx) if block.array_storage == "fits": # Views over arrays stored in FITS files have some idiosyncrasies. # astropy.io.fits always writes arrays C-contiguous with big-endian # byte order, whereas asdf preserves the "contiguousity" and byte order # of the base array. if ( block.data.shape != data.shape or block.data.dtype != data.dtype or block.data.ctypes.data != data.ctypes.data or block.data.strides != data.strides ): raise ValueError( "ASDF has only limited support for serializing views over arrays stored " "in FITS HDUs. This error likely means that a slice of such an array " "was found in the ASDF tree. The slice can be decoupled from the FITS " "array by calling copy() before assigning it to the tree." ) offset = 0 strides = None dtype, byteorder = numpy_dtype_to_asdf_datatype( data.dtype, include_byteorder=(block.array_storage != "inline"), override_byteorder="big", ) else: # Compute the offset relative to the base array and not the # block data, in case the block is compressed. offset = data.ctypes.data - base.ctypes.data if data.flags.c_contiguous: strides = None else: strides = data.strides dtype, byteorder = numpy_dtype_to_asdf_datatype( data.dtype, include_byteorder=(block.array_storage != "inline"), ) result = {} result["shape"] = list(shape) if block.array_storage == "streamed": result["shape"][0] = "*" if block.array_storage == "inline": listdata = numpy_array_to_list(data) result["data"] = listdata result["datatype"] = dtype else: result["shape"] = list(shape) if block.array_storage == "streamed": result["shape"][0] = "*" result["source"] = ctx.blocks.get_source(block) result["datatype"] = dtype result["byteorder"] = byteorder if offset > 0: result["offset"] = offset if strides is not None: result["strides"] = list(strides) if isinstance(data, ma.MaskedArray): if np.any(data.mask): if block.array_storage == "inline": ctx.blocks.set_array_storage(ctx.blocks[data.mask], "inline") result["mask"] = data.mask return result
@classmethod def _assert_equality(cls, old, new, func): if old.dtype.fields: if not new.dtype.fields: # This line is safe because this is actually a piece of test # code, even though it lives in this file: assert False, "arrays not equal" # nosec for a, b in zip(old, new): cls._assert_equality(a, b, func) else: old = old.__array__() new = new.__array__() if old.dtype.char in "SU": if old.dtype.char == "S": old = old.astype("U") if new.dtype.char == "S": new = new.astype("U") old = old.tolist() new = new.tolist() # This line is safe because this is actually a piece of test # code, even though it lives in this file: assert old == new # nosec else: func(old, new)
[docs] @classmethod def assert_equal(cls, old, new): from numpy.testing import assert_array_equal cls._assert_equality(old, new, assert_array_equal)
[docs] @classmethod def assert_allclose(cls, old, new): from numpy.testing import assert_allclose, assert_array_equal if old.dtype.kind in "iu" and new.dtype.kind in "iu": cls._assert_equality(old, new, assert_array_equal) else: cls._assert_equality(old, new, assert_allclose)
[docs] @classmethod def copy_to_new_asdf(cls, node, asdffile): if isinstance(node, NDArrayType): array = node._make_array() asdffile.blocks.set_array_storage(asdffile.blocks[array], node.block.array_storage) return node._make_array() return node
def _make_operation(name): def __operation__(self, *args): return getattr(self._make_array(), name)(*args) return __operation__ classes_to_modify = NDArrayType.__versioned_siblings + [ NDArrayType, ] for op in [ "__neg__", "__pos__", "__abs__", "__invert__", "__complex__", "__int__", "__long__", "__float__", "__oct__", "__hex__", "__lt__", "__le__", "__eq__", "__ne__", "__gt__", "__ge__", "__cmp__", "__rcmp__", "__add__", "__sub__", "__mul__", "__floordiv__", "__mod__", "__divmod__", "__pow__", "__lshift__", "__rshift__", "__and__", "__xor__", "__or__", "__div__", "__truediv__", "__radd__", "__rsub__", "__rmul__", "__rdiv__", "__rtruediv__", "__rfloordiv__", "__rmod__", "__rdivmod__", "__rpow__", "__rlshift__", "__rrshift__", "__rand__", "__rxor__", "__ror__", "__iadd__", "__isub__", "__imul__", "__idiv__", "__itruediv__", "__ifloordiv__", "__imod__", "__ipow__", "__ilshift__", "__irshift__", "__iand__", "__ixor__", "__ior__", "__getitem__", "__delitem__", "__contains__", ]: [setattr(cls, op, _make_operation(op)) for cls in classes_to_modify] del classes_to_modify def _get_ndim(instance): if isinstance(instance, list): array = inline_data_asarray(instance) return array.ndim elif isinstance(instance, dict): if "shape" in instance: return len(instance["shape"]) elif "data" in instance: array = inline_data_asarray(instance["data"]) return array.ndim elif isinstance(instance, (np.ndarray, NDArrayType)): return len(instance.shape) def validate_ndim(validator, ndim, instance, schema): in_ndim = _get_ndim(instance) if in_ndim != ndim: yield ValidationError(f"Wrong number of dimensions: Expected {ndim}, got {in_ndim}", instance=repr(instance)) def validate_max_ndim(validator, max_ndim, instance, schema): in_ndim = _get_ndim(instance) if in_ndim > max_ndim: yield ValidationError( f"Wrong number of dimensions: Expected max of {max_ndim}, got {in_ndim}", instance=repr(instance), ) def validate_datatype(validator, datatype, instance, schema): if isinstance(instance, list): array = inline_data_asarray(instance) in_datatype, _ = numpy_dtype_to_asdf_datatype(array.dtype) elif isinstance(instance, dict): if "datatype" in instance: in_datatype = instance["datatype"] elif "data" in instance: array = inline_data_asarray(instance["data"]) in_datatype, _ = numpy_dtype_to_asdf_datatype(array.dtype) else: raise ValidationError("Not an array") elif isinstance(instance, (np.ndarray, NDArrayType)): in_datatype, _ = numpy_dtype_to_asdf_datatype(instance.dtype) else: raise ValidationError("Not an array") if datatype == in_datatype: return if schema.get("exact_datatype", False): yield ValidationError(f"Expected datatype '{datatype}', got '{in_datatype}'") np_datatype = asdf_datatype_to_numpy_dtype(datatype) np_in_datatype = asdf_datatype_to_numpy_dtype(in_datatype) if not np_datatype.fields: if np_in_datatype.fields: yield ValidationError(f"Expected scalar datatype '{datatype}', got '{in_datatype}'") if not np.can_cast(np_in_datatype, np_datatype, "safe"): yield ValidationError(f"Can not safely cast from '{in_datatype}' to '{datatype}' ") else: if not np_in_datatype.fields: yield ValidationError(f"Expected structured datatype '{datatype}', got '{in_datatype}'") if len(np_in_datatype.fields) != len(np_datatype.fields): yield ValidationError(f"Mismatch in number of columns: Expected {len(datatype)}, got {len(in_datatype)}") for i in range(len(np_datatype.fields)): in_type = np_in_datatype[i] out_type = np_datatype[i] if not np.can_cast(in_type, out_type, "safe"): yield ValidationError( "Can not safely cast to expected datatype: " f"Expected {numpy_dtype_to_asdf_datatype(out_type)[0]}, " f"got {numpy_dtype_to_asdf_datatype(in_type)[0]}" ) NDArrayType.validators = {"ndim": validate_ndim, "max_ndim": validate_max_ndim, "datatype": validate_datatype}