import mmap
import sys
import numpy as np
from numpy import ma
from asdf import util
from asdf._jsonschema import ValidationError
_datatype_names = {
"int8": "i1",
"int16": "i2",
"int32": "i4",
"int64": "i8",
"uint8": "u1",
"uint16": "u2",
"uint32": "u4",
"uint64": "u8",
"float32": "f4",
"float64": "f8",
"complex64": "c8",
"complex128": "c16",
"bool8": "b1",
}
_string_datatype_names = {"ascii": "S", "ucs4": "U"}
def asdf_byteorder_to_numpy_byteorder(byteorder):
if byteorder == "big":
return ">"
if byteorder == "little":
return "<"
msg = f"Invalid ASDF byteorder '{byteorder}'"
raise ValueError(msg)
def asdf_datatype_to_numpy_dtype(datatype, byteorder=None):
if byteorder is None:
byteorder = sys.byteorder
if isinstance(datatype, str) and datatype in _datatype_names:
datatype = _datatype_names[datatype]
byteorder = asdf_byteorder_to_numpy_byteorder(byteorder)
return np.dtype(str(byteorder + datatype))
if (
isinstance(datatype, list)
and len(datatype) == 2
and isinstance(datatype[0], str)
and isinstance(datatype[1], int)
and datatype[0] in _string_datatype_names
):
length = datatype[1]
byteorder = asdf_byteorder_to_numpy_byteorder(byteorder)
datatype = str(byteorder) + str(_string_datatype_names[datatype[0]]) + str(length)
return np.dtype(datatype)
if isinstance(datatype, dict):
if "datatype" not in datatype:
msg = f"Field entry has no datatype: '{datatype}'"
raise ValueError(msg)
name = datatype.get("name", "")
byteorder = datatype.get("byteorder", byteorder)
shape = datatype.get("shape")
datatype = asdf_datatype_to_numpy_dtype(datatype["datatype"], byteorder)
if shape is None:
return (str(name), datatype)
return (str(name), datatype, tuple(shape))
if isinstance(datatype, list):
datatype_list = []
for subdatatype in datatype:
np_dtype = asdf_datatype_to_numpy_dtype(subdatatype, byteorder)
if isinstance(np_dtype, tuple):
datatype_list.append(np_dtype)
elif isinstance(np_dtype, np.dtype):
datatype_list.append(("", np_dtype))
else:
msg = "Error parsing asdf datatype"
raise RuntimeError(msg)
return np.dtype(datatype_list)
msg = f"Unknown datatype {datatype}"
raise ValueError(msg)
def numpy_byteorder_to_asdf_byteorder(byteorder, override=None):
if override is not None:
return override
if byteorder == "=":
return sys.byteorder
if byteorder == "<":
return "little"
return "big"
def numpy_dtype_to_asdf_datatype(dtype, include_byteorder=True, override_byteorder=None):
dtype = np.dtype(dtype)
if dtype.names is not None:
fields = []
for name in dtype.names:
field = dtype.fields[name][0]
d = {}
d["name"] = name
field_dtype, byteorder = numpy_dtype_to_asdf_datatype(field, override_byteorder=override_byteorder)
d["datatype"] = field_dtype
if include_byteorder:
d["byteorder"] = byteorder
if field.shape:
d["shape"] = list(field.shape)
fields.append(d)
return fields, numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder)
if dtype.subdtype is not None:
return numpy_dtype_to_asdf_datatype(dtype.subdtype[0], override_byteorder=override_byteorder)
if dtype.name in _datatype_names:
return dtype.name, numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder)
if dtype.name == "bool":
return "bool8", numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder)
if dtype.name.startswith("string") or dtype.name.startswith("bytes"):
return ["ascii", dtype.itemsize], "big"
if dtype.name.startswith("unicode") or dtype.name.startswith("str"):
return (
["ucs4", int(dtype.itemsize / 4)],
numpy_byteorder_to_asdf_byteorder(dtype.byteorder, override=override_byteorder),
)
msg = f"Unknown dtype {dtype}"
raise ValueError(msg)
def inline_data_asarray(inline, dtype=None):
# np.asarray doesn't handle structured arrays unless the innermost
# elements are tuples. To do that, we drill down the first
# element of each level until we find a single item that
# successfully converts to a scalar of the expected structured
# dtype. Then we go through and convert everything at that level
# to a tuple. This probably breaks for nested structured dtypes,
# but it's probably good enough for now. It also won't work with
# object dtypes, but ASDF explicitly excludes those, so we're ok
# there.
if dtype is not None and dtype.fields is not None:
def find_innermost_match(line, depth=0):
if not isinstance(line, list) or not len(line):
msg = "data can not be converted to structured array"
raise ValueError(msg)
try:
np.asarray(tuple(line), dtype=dtype)
except ValueError:
return find_innermost_match(line[0], depth + 1)
else:
return depth
depth = find_innermost_match(inline)
def convert_to_tuples(line, data_depth, depth=0):
if data_depth == depth:
return tuple(line)
return [convert_to_tuples(x, data_depth, depth + 1) for x in line]
inline = convert_to_tuples(inline, depth)
return np.asarray(inline, dtype=dtype)
def handle_mask(inline):
if isinstance(inline, list):
if None in inline:
inline_array = np.asarray(inline)
nones = np.equal(inline_array, None)
return np.ma.array(np.where(nones, 0, inline), mask=nones)
return [handle_mask(x) for x in inline]
return inline
inline = handle_mask(inline)
inline = np.ma.asarray(inline, dtype=dtype)
if not ma.is_masked(inline):
return inline.data
return inline
def numpy_array_to_list(array):
def tolist(x):
if isinstance(x, (np.ndarray, NDArrayType)):
x = x.astype("U").tolist() if x.dtype.char == "S" else x.tolist()
if isinstance(x, (list, tuple)):
return [tolist(y) for y in x]
return x
def ascii_to_unicode(x):
# Convert byte string arrays to unicode string arrays, since YAML
# doesn't handle the former.
if isinstance(x, list):
return [ascii_to_unicode(y) for y in x]
if isinstance(x, bytes):
return x.decode("ascii")
return x
return ascii_to_unicode(tolist(array))
[docs]
class NDArrayType:
def __init__(self, source, shape, dtype, offset, strides, order, mask, data_callback=None):
self._source = source
self._data_callback = data_callback
self._array = None
self._mask = mask
if isinstance(source, list):
self._array = inline_data_asarray(source, dtype)
self._array = self._apply_mask(self._array, self._mask)
# single element structured arrays can have shape == ()
# https://github.com/asdf-format/asdf/issues/1540
if shape is not None and (
self._array.shape != tuple(shape)
or (len(shape) and shape[0] == "*" and self._array.shape[1:] != tuple(shape[1:]))
):
msg = "inline data doesn't match the given shape"
raise ValueError(msg)
self._shape = shape
self._dtype = dtype
self._offset = offset
self._strides = strides
self._order = order
def _make_array(self):
# If the ASDF file has been updated in-place, then there's
# a chance that the block's original data object has been
# closed and replaced. We need to check here and re-generate
# the array if necessary, otherwise we risk segfaults when
# memory mapping.
if self._array is not None:
base = util.get_array_base(self._array)
if isinstance(base, np.memmap) and isinstance(base.base, mmap.mmap):
# check if the underlying mmap matches the one generated by generic_io
try:
fd = self._data_callback(_attr="_fd")()
except AttributeError:
# external blocks do not have a '_fd' and don't need to be updated
fd = None
if fd is not None:
if getattr(fd, "_mmap", None) is not base.base:
self._array = None
del fd
if self._array is None:
if isinstance(self._source, str):
# we need to keep _source as a str to allow stdatamodels to
# support AsdfInFits
data = self._data_callback()
else:
# cached data is used here so that multiple NDArrayTypes will all use
# the same base array
data = self._data_callback(_attr="cached_data")
if hasattr(data, "base") and isinstance(data.base, mmap.mmap) and data.base.closed:
msg = "ASDF file has already been closed. Can not get the data."
raise OSError(msg)
# compute shape (streaming blocks have '0' data size in the block header)
shape = self.get_actual_shape(
self._shape,
self._strides,
self._dtype,
data.size,
)
self._array = np.ndarray(shape, self._dtype, data, self._offset, self._strides, self._order)
self._array = self._apply_mask(self._array, self._mask)
return self._array
def _apply_mask(self, array, mask):
if isinstance(mask, (np.ndarray, NDArrayType)):
# Use "mask.view()" here so the underlying possibly
# memmapped mask array is freed properly when the masked
# array goes away.
array = ma.array(array, mask=mask.view())
return array
if np.isscalar(mask):
if np.isnan(mask):
return ma.array(array, mask=np.isnan(array))
return ma.masked_values(array, mask)
return array
def __array__(self):
return self._make_array()
def __repr__(self):
# repr alone should not force loading of the data
if self._array is None:
return (
f"<{'array' if self._mask is None else 'masked array'} "
f"(unloaded) shape: {self._shape} dtype: {self._dtype}>"
)
return repr(self._make_array())
def __str__(self):
# str alone should not force loading of the data
if self._array is None:
return (
f"<{'array' if self._mask is None else 'masked array'} "
f"(unloaded) shape: {self._shape} dtype: {self._dtype}>"
)
return str(self._make_array())
[docs]
def get_actual_shape(self, shape, strides, dtype, block_size):
"""
Get the actual shape of an array, by computing it against the
block_size if it contains a ``*``.
"""
num_stars = shape.count("*")
if num_stars == 0:
return shape
if num_stars == 1:
if shape[0] != "*":
msg = "'*' may only be in first entry of shape"
raise ValueError(msg)
stride = strides[0] if strides is not None else np.prod(shape[1:]) * dtype.itemsize
missing = int(block_size / stride)
return [missing] + shape[1:]
msg = f"Invalid shape '{shape}'"
raise ValueError(msg)
@property
def shape(self):
if self._shape is None or self._array is not None or "*" in self._shape:
# streamed blocks have a '0' data_size in the header so we
# need to make the array to get the shape
return self.__array__().shape
return tuple(self._shape)
@property
def dtype(self):
if self._array is None:
return self._dtype
return self._make_array().dtype
def __len__(self):
if self._array is None:
return self._shape[0]
return len(self._make_array())
def __getattr__(self, attr):
# We need to ignore __array_struct__, or unicode arrays end up
# getting "double casted" and upsized. This also reduces the
# number of array creations in the general case.
if attr == "__array_struct__":
raise AttributeError
# AsdfFile.info will call hasattr(obj, "__asdf_traverse__") which
# will trigger this method, making the array, and loading the array
# data. Intercept this and raise AttributeError as this class does
# not support that method
# see: https://github.com/asdf-format/asdf/issues/1553
if attr == "__asdf_traverse__":
raise AttributeError
return getattr(self._make_array(), attr)
def __setitem__(self, *args):
# This workaround appears to be necessary in order to avoid a segfault
# in the case that array assignment causes an exception. The segfault
# originates from the call to __repr__ inside the traceback report.
try:
self._make_array().__setitem__(*args)
except Exception:
self._array = None
raise
def _make_operation(name):
def operation(self, *args):
return getattr(self._make_array(), name)(*args)
return operation
for op in [
"__neg__",
"__pos__",
"__abs__",
"__invert__",
"__complex__",
"__int__",
"__long__",
"__float__",
"__oct__",
"__hex__",
"__lt__",
"__le__",
"__eq__",
"__ne__",
"__gt__",
"__ge__",
"__cmp__",
"__rcmp__",
"__add__",
"__sub__",
"__mul__",
"__floordiv__",
"__mod__",
"__divmod__",
"__pow__",
"__lshift__",
"__rshift__",
"__and__",
"__xor__",
"__or__",
"__div__",
"__truediv__",
"__radd__",
"__rsub__",
"__rmul__",
"__rdiv__",
"__rtruediv__",
"__rfloordiv__",
"__rmod__",
"__rdivmod__",
"__rpow__",
"__rlshift__",
"__rrshift__",
"__rand__",
"__rxor__",
"__ror__",
"__iadd__",
"__isub__",
"__imul__",
"__idiv__",
"__itruediv__",
"__ifloordiv__",
"__imod__",
"__ipow__",
"__ilshift__",
"__irshift__",
"__iand__",
"__ixor__",
"__ior__",
"__getitem__",
"__delitem__",
"__contains__",
]:
setattr(NDArrayType, op, _make_operation(op))
def _get_ndim(instance):
if isinstance(instance, list):
array = inline_data_asarray(instance)
return array.ndim
if isinstance(instance, dict):
if "shape" in instance:
return len(instance["shape"])
if "data" in instance:
array = inline_data_asarray(instance["data"])
return array.ndim
if isinstance(instance, (np.ndarray, NDArrayType)):
return len(instance.shape)
return None
def validate_ndim(validator, ndim, instance, schema):
in_ndim = _get_ndim(instance)
if in_ndim != ndim:
yield ValidationError(f"Wrong number of dimensions: Expected {ndim}, got {in_ndim}", instance=repr(instance))
def validate_max_ndim(validator, max_ndim, instance, schema):
in_ndim = _get_ndim(instance)
if in_ndim > max_ndim:
yield ValidationError(
f"Wrong number of dimensions: Expected max of {max_ndim}, got {in_ndim}",
instance=repr(instance),
)
def validate_datatype(validator, datatype, instance, schema):
if isinstance(instance, list):
array = inline_data_asarray(instance)
in_datatype, _ = numpy_dtype_to_asdf_datatype(array.dtype)
elif isinstance(instance, dict):
if "datatype" in instance:
in_datatype = instance["datatype"]
elif "data" in instance:
array = inline_data_asarray(instance["data"])
in_datatype, _ = numpy_dtype_to_asdf_datatype(array.dtype)
else:
msg = "Not an array"
raise ValidationError(msg)
elif isinstance(instance, (np.ndarray, NDArrayType)):
in_datatype, _ = numpy_dtype_to_asdf_datatype(instance.dtype)
else:
msg = "Not an array"
raise ValidationError(msg)
if datatype == in_datatype:
return
if schema.get("exact_datatype", False):
yield ValidationError(f"Expected datatype '{datatype}', got '{in_datatype}'")
np_datatype = asdf_datatype_to_numpy_dtype(datatype)
np_in_datatype = asdf_datatype_to_numpy_dtype(in_datatype)
if not np_datatype.fields:
if np_in_datatype.fields:
yield ValidationError(f"Expected scalar datatype '{datatype}', got '{in_datatype}'")
if not np.can_cast(np_in_datatype, np_datatype, "safe"):
yield ValidationError(f"Can not safely cast from '{in_datatype}' to '{datatype}' ")
else:
if not np_in_datatype.fields:
yield ValidationError(f"Expected structured datatype '{datatype}', got '{in_datatype}'")
if len(np_in_datatype.fields) != len(np_datatype.fields):
yield ValidationError(f"Mismatch in number of columns: Expected {len(datatype)}, got {len(in_datatype)}")
for i in range(len(np_datatype.fields)):
in_type = np_in_datatype[i]
out_type = np_datatype[i]
if not np.can_cast(in_type, out_type, "safe"):
yield ValidationError(
"Can not safely cast to expected datatype: "
f"Expected {numpy_dtype_to_asdf_datatype(out_type)[0]}, "
f"got {numpy_dtype_to_asdf_datatype(in_type)[0]}",
)