Source code for asdf.extension._manager

from functools import lru_cache

from asdf.tagged import Tagged
from asdf.util import get_class_name, uri_match

from ._extension import ExtensionProxy


[docs] class ExtensionManager: """ Wraps a list of extensions and indexes their converters by tag and by Python type. Parameters ---------- extensions : iterable of asdf.extension.Extension List of enabled extensions to manage. Extensions placed earlier in the list take precedence. """ def __init__(self, extensions): self._extensions = [ExtensionProxy.maybe_wrap(e) for e in extensions] self._tag_defs_by_tag = {} self._converters_by_tag = {} # This dict has both str and type keys: self._converters_by_type = {} validators = set() for extension in self._extensions: for tag_def in extension.tags: if tag_def.tag_uri not in self._tag_defs_by_tag: self._tag_defs_by_tag[tag_def.tag_uri] = tag_def for converter in extension.converters: for tag in converter.tags: if tag not in self._converters_by_tag: self._converters_by_tag[tag] = converter for typ in converter.types: if isinstance(typ, str): if typ not in self._converters_by_type: self._converters_by_type[typ] = converter else: type_class_name = get_class_name(typ, instance=False) if typ not in self._converters_by_type and type_class_name not in self._converters_by_type: self._converters_by_type[typ] = converter self._converters_by_type[type_class_name] = converter validators.update(extension.validators) self._validator_manager = _get_cached_validator_manager(tuple(validators)) @property def extensions(self): """ Get the list of extensions. Returns ------- list of asdf.extension.ExtensionProxy """ return self._extensions
[docs] def handles_tag(self, tag): """ Return `True` if the specified tag is handled by a converter. Parameters ---------- tag : str Tag URI. Returns ------- bool """ return tag in self._converters_by_tag
[docs] def handles_type(self, typ): """ Returns `True` if the specified Python type is handled by a converter. Parameters ---------- typ : type Returns ------- bool """ return typ in self._converters_by_type or get_class_name(typ, instance=False) in self._converters_by_type
[docs] def handles_tag_definition(self, tag): """ Return `True` if the specified tag has a definition. Parameters ---------- tag : str Tag URI. Returns ------- bool """ return tag in self._tag_defs_by_tag
[docs] def get_tag_definition(self, tag): """ Get the tag definition for the specified tag. Parameters ---------- tag : str Tag URI. Returns ------- asdf.extension.TagDefinition Raises ------ KeyError Unrecognized tag URI. """ try: return self._tag_defs_by_tag[tag] except KeyError: msg = f"No support available for YAML tag '{tag}'. You may need to install a missing extension." raise KeyError(msg) from None
[docs] def get_converter_for_tag(self, tag): """ Get the converter for the specified tag. Parameters ---------- tag : str Tag URI. Returns ------- asdf.extension.Converter Raises ------ KeyError Unrecognized tag URI. """ try: return self._converters_by_tag[tag] except KeyError: msg = f"No support available for YAML tag '{tag}'. You may need to install a missing extension." raise KeyError(msg) from None
[docs] def get_converter_for_type(self, typ): """ Get the converter for the specified Python type. Parameters ---------- typ : type Returns ------- asdf.extension.Converter Raises ------ KeyError Unrecognized type. """ try: return self._converters_by_type[typ] except KeyError: class_name = get_class_name(typ, instance=False) try: return self._converters_by_type[class_name] except KeyError: msg = ( f"No support available for Python type '{get_class_name(typ, instance=False)}'. " "You may need to install or enable an extension." ) raise KeyError(msg) from None
@property def validator_manager(self): return self._validator_manager
[docs] def get_cached_extension_manager(extensions): """ Get a previously created ExtensionManager for the specified extensions, or create and cache one if necessary. Building the manager is expensive, so it helps performance to reuse it when possible. Parameters ---------- extensions : list of asdf.extension.Extension Returns ------- asdf.extension.ExtensionManager """ from ._extension import ExtensionProxy # The tuple makes the extensions hashable so that we # can pass them to the lru_cache method. The ExtensionProxy # overrides __hash__ to return the hashed object id of the wrapped # extension, so this will method will only return the same # ExtensionManager if the list contains identical extension # instances in identical order. extensions = tuple(ExtensionProxy.maybe_wrap(e) for e in extensions) return _get_cached_extension_manager(extensions)
@lru_cache def _get_cached_extension_manager(extensions): return ExtensionManager(extensions) class ValidatorManager: """ Wraps a list of custom validators and indexes them by schema property. Parameters ---------- validators : iterable of asdf.extension.Validator List of validators to manage. """ def __init__(self, validators): self._validators = list(validators) self._validators_by_schema_property = {} for validator in self._validators: if validator.schema_property not in self._validators_by_schema_property: self._validators_by_schema_property[validator.schema_property] = set() self._validators_by_schema_property[validator.schema_property].add(validator) self._jsonschema_validators_by_schema_property = {} for schema_property in self._validators_by_schema_property: self._jsonschema_validators_by_schema_property[schema_property] = self._get_jsonschema_validator( schema_property, ) def validate(self, schema_property, schema_property_value, node, schema): """ Validate an ASDF tree node against custom validators for a schema property. Parameters ---------- schema_property : str Name of the schema property (identifies the validator(s) to use). schema_property_value : object Value of the schema property. node : asdf.tagged.Tagged The ASDF node to validate. schema : dict The schema object that contains the property that triggered the validation. Yields ------ asdf.exceptions.ValidationError """ if schema_property in self._validators_by_schema_property: for validator in self._validators_by_schema_property[schema_property]: if _validator_matches(validator, node): yield from validator.validate(schema_property_value, node, schema) def get_jsonschema_validators(self): """ Get a dictionary of validator methods suitable for use with the jsonschema library. Returns ------- dict of str: callable """ return dict(self._jsonschema_validators_by_schema_property) def _get_jsonschema_validator(self, schema_property): def _validator(_, schema_property_value, node, schema): return self.validate(schema_property, schema_property_value, node, schema) return _validator def _validator_matches(validator, node): if any(t == "**" for t in validator.tags): return True if not isinstance(node, Tagged): return False return any(uri_match(t, node._tag) for t in validator.tags) @lru_cache def _get_cached_validator_manager(validators): return ValidatorManager(validators)