Source code for asdf.extension._manager

import sys
from functools import lru_cache

from asdf.tagged import Tagged
from asdf.util import get_class_name, uri_match

from ._extension import ExtensionProxy


def _resolve_type(path):
    """
    Convert a class path (like the string "asdf.AsdfFile") to a
    class (``asdf.AsdfFile``) only if the module implementing the
    class has already been imported.

    Parameters
    ----------

    path : str
        Path/name of class (for example, "asdf.AsdfFile")

    Returns
    -------

    typ : class or None
        The class (if it's already been imported) or None
    """
    if "." not in path:
        # check if this path is a module
        if path in sys.modules:
            return sys.modules[path]
        return None
    # this type is part of a module
    module_name, type_name = path.rsplit(".", maxsplit=1)
    # if the module is not imported, don't index it
    if module_name not in sys.modules:
        return None
    module = sys.modules[module_name]
    if not hasattr(module, type_name):
        # the imported module does not have this class, perhaps
        # it is dynamically created so do not index it yet
        return None
    return getattr(module, type_name)


[docs] class ExtensionManager: """ Wraps a list of extensions and indexes their converters by tag and by Python type. Parameters ---------- extensions : iterable of asdf.extension.Extension List of enabled extensions to manage. Extensions placed earlier in the list take precedence. """ def __init__(self, extensions): self._extensions = [ExtensionProxy.maybe_wrap(e) for e in extensions] self._tag_defs_by_tag = {} self._converters_by_tag = {} # To optimize performance converters can be registered using either: # - the class/type they convert # - the name/path (string) of the class/type they convert # This allows the registration to continue without importing # every module for every extension (which would be needed to turn # the class paths into proper classes). Using class paths can be # complicated by packages that have private implementations of # classes that are exposed at a different 'public' location. # These private classes may change between minor versions # and would break converters that are registered using the private # class path. However, often libraries do not modify the module # of the 'public' class (so inspecting the class path returns # the private class path). One example of this in asdf is # Converter (exposed as ``asdf.extension.Converter`` but with # a class path of ``asdf.extension._converter.Converter``). # To allow converters to be registered with the public location # we will need to attempt to import the public class path # and then register the private class path after the class is # imported. We don't want to do this unnecessarily and since # class instances do not contain the public class path # we adopt a strategy of checking class paths and only # registering those that have already been imported. Thiss # is ok because asdf will only use the converter type # when attempting to serialize an object in memory (so the # public class path will already be imported at the time # the converter is needed). # first we store the converters in the order they are discovered # the key here can either be a class path (str) or class (type) converters_by_type = {} validators = set() for extension in self._extensions: for tag_def in extension.tags: if tag_def.tag_uri not in self._tag_defs_by_tag: self._tag_defs_by_tag[tag_def.tag_uri] = tag_def for converter in extension.converters: for tag in converter.tags: if tag not in self._converters_by_tag: self._converters_by_tag[tag] = converter for typ in converter.types: if typ not in converters_by_type: converters_by_type[typ] = converter validators.update(extension.validators) self._converters_by_class_path = {} self._converters_by_type = {} for type_or_path, converter in converters_by_type.items(): if isinstance(type_or_path, str): path = type_or_path typ = _resolve_type(path) if typ is None: if path not in self._converters_by_class_path: self._converters_by_class_path[path] = converter continue else: typ = type_or_path if typ not in self._converters_by_type: self._converters_by_type[typ] = converter self._validator_manager = _get_cached_validator_manager(tuple(validators)) @property def extensions(self): """ Get the list of extensions. Returns ------- list of asdf.extension.ExtensionProxy """ return self._extensions
[docs] def handles_tag(self, tag): """ Return `True` if the specified tag is handled by a converter. Parameters ---------- tag : str Tag URI. Returns ------- bool """ return tag in self._converters_by_tag
[docs] def handles_type(self, typ): """ Returns `True` if the specified Python type is handled by a converter. Parameters ---------- typ : type Returns ------- bool """ if typ in self._converters_by_type: return True self._index_converters() return typ in self._converters_by_type
[docs] def handles_tag_definition(self, tag): """ Return `True` if the specified tag has a definition. Parameters ---------- tag : str Tag URI. Returns ------- bool """ return tag in self._tag_defs_by_tag
[docs] def get_tag_definition(self, tag): """ Get the tag definition for the specified tag. Parameters ---------- tag : str Tag URI. Returns ------- asdf.extension.TagDefinition Raises ------ KeyError Unrecognized tag URI. """ try: return self._tag_defs_by_tag[tag] except KeyError: msg = f"No support available for YAML tag '{tag}'. You may need to install a missing extension." raise KeyError(msg) from None
[docs] def get_converter_for_tag(self, tag): """ Get the converter for the specified tag. Parameters ---------- tag : str Tag URI. Returns ------- asdf.extension.Converter Raises ------ KeyError Unrecognized tag URI. """ try: return self._converters_by_tag[tag] except KeyError: msg = f"No support available for YAML tag '{tag}'. You may need to install a missing extension." raise KeyError(msg) from None
[docs] def get_converter_for_type(self, typ): """ Get the converter for the specified Python type. Parameters ---------- typ : type Returns ------- asdf.extension.Converter Raises ------ KeyError Unrecognized type. """ if typ not in self._converters_by_type: self._index_converters() try: return self._converters_by_type[typ] except KeyError: msg = ( f"No support available for Python type '{get_class_name(typ, instance=False)}'. " "You may need to install or enable an extension." ) raise KeyError(msg) from None
def _index_converters(self): """ Search _converters_by_class_path for paths (strings) that refer to classes that are currently imported. For imported classes, add them to _converters_by_class (if the class doesn't already have a converter). """ # search class paths to find ones that are imported for class_path in list(self._converters_by_class_path): typ = _resolve_type(class_path) if typ is None: continue if typ not in self._converters_by_type: self._converters_by_type[typ] = self._converters_by_class_path[class_path] del self._converters_by_class_path[class_path] @property def validator_manager(self): return self._validator_manager
[docs] def get_cached_extension_manager(extensions): """ Get a previously created ExtensionManager for the specified extensions, or create and cache one if necessary. Building the manager is expensive, so it helps performance to reuse it when possible. Parameters ---------- extensions : list of asdf.extension.Extension Returns ------- asdf.extension.ExtensionManager """ from ._extension import ExtensionProxy # The tuple makes the extensions hashable so that we # can pass them to the lru_cache method. The ExtensionProxy # overrides __hash__ to return the hashed object id of the wrapped # extension, so this will method will only return the same # ExtensionManager if the list contains identical extension # instances in identical order. extensions = tuple(ExtensionProxy.maybe_wrap(e) for e in extensions) return _get_cached_extension_manager(extensions)
@lru_cache def _get_cached_extension_manager(extensions): return ExtensionManager(extensions) class ValidatorManager: """ Wraps a list of custom validators and indexes them by schema property. Parameters ---------- validators : iterable of asdf.extension.Validator List of validators to manage. """ def __init__(self, validators): self._validators = list(validators) self._validators_by_schema_property = {} for validator in self._validators: if validator.schema_property not in self._validators_by_schema_property: self._validators_by_schema_property[validator.schema_property] = set() self._validators_by_schema_property[validator.schema_property].add(validator) self._jsonschema_validators_by_schema_property = {} for schema_property in self._validators_by_schema_property: self._jsonschema_validators_by_schema_property[schema_property] = self._get_jsonschema_validator( schema_property, ) def validate(self, schema_property, schema_property_value, node, schema): """ Validate an ASDF tree node against custom validators for a schema property. Parameters ---------- schema_property : str Name of the schema property (identifies the validator(s) to use). schema_property_value : object Value of the schema property. node : asdf.tagged.Tagged The ASDF node to validate. schema : dict The schema object that contains the property that triggered the validation. Yields ------ asdf.exceptions.ValidationError """ if schema_property in self._validators_by_schema_property: for validator in self._validators_by_schema_property[schema_property]: if _validator_matches(validator, node): yield from validator.validate(schema_property_value, node, schema) def get_jsonschema_validators(self): """ Get a dictionary of validator methods suitable for use with the jsonschema library. Returns ------- dict of str: callable """ return dict(self._jsonschema_validators_by_schema_property) def _get_jsonschema_validator(self, schema_property): def _validator(_, schema_property_value, node, schema): return self.validate(schema_property, schema_property_value, node, schema) return _validator def _validator_matches(validator, node): if any(t == "**" for t in validator.tags): return True if not isinstance(node, Tagged): return False return any(uri_match(t, node._tag) for t in validator.tags) @lru_cache def _get_cached_validator_manager(validators): return ValidatorManager(validators)