import enum
import logging
from collections import OrderedDict
from typing import Union
from typedpy.commons import Constant, default_factories, first_in, wrap_val
from typedpy.fields import (
FunctionCall,
Map,
Negative,
NonNegative,
NonPositive,
Positive,
StructureReference,
Integer,
Number,
Float,
Array,
String,
Boolean,
AllOf,
OneOf,
AnyOf,
NotField,
Tuple,
Set,
Enum,
)
from typedpy.extfields import DateString
from typedpy.serialization.mappers import (
DoNotSerialize,
aggregate_serialization_mappers,
)
from typedpy.structures import (
ADDITIONAL_PROPERTIES,
NoneField,
Structure,
TypedPyDefaults,
ClassReference,
Field,
)
SCHEMA_PATTERN_PROPERTIES = "patternProperties"
SCHEMA_ADDITIONAL_PROPERTIES = "additionalProperties"
SCHEMA_PROPETIES = "properties"
SCHEMA_PROPERTY_NAMES = "propertyNames"
def get_mapper(field_cls):
field_type_to_mapper = {
StructureReference: StructureReferenceMapper,
Integer: IntegerMapper,
Number: NumberMapper,
Float: NumberMapper,
Array: ArrayMapper,
Boolean: BooleanMapper,
Enum: EnumMapper,
String: StringMapper,
DateString: DateStringMapper,
AllOf: AllOfMapper,
AnyOf: AnyOfMapper,
OneOf: OneOfMapper,
NotField: NotFieldMapper,
Tuple: ArrayMapper,
Set: ArrayMapper,
Map: MapMapper,
}
for cls in field_cls.__mro__:
if issubclass(cls, Field) and cls in field_type_to_mapper:
return field_type_to_mapper[cls]
raise NotImplementedError(f"schema mapping is not implemented for {field_cls}")
def _map_class_reference(reference, definitions_schema):
definition, _ = structure_to_schema(getattr(reference, "_ty"), definitions_schema)
name = getattr(reference, "_ty").__name__
definitions_schema[name] = definition
return {"$ref": f"#/definitions/{name}"}
def _const_to_schema(field: Constant):
val = field()
return {"enum": [val.name if isinstance(val, enum.Enum) else val]}
def convert_to_schema(field, definitions_schema, serialization_mapper: dict = None):
"""
In case field is None, should return None.
Should deal with a list of fields, as well as a single one
"""
if field is None:
return None
if isinstance(field, ClassReference):
return _map_class_reference(field, definitions_schema)
if isinstance(field, list):
return [
convert_to_schema(
f, definitions_schema, serialization_mapper=serialization_mapper
)
for f in field
]
if isinstance(field, Constant):
return _const_to_schema(field)
custom = field.to_json_schema()
if custom:
return custom
mapper = get_mapper(field.__class__)(field)
return mapper.to_schema(
definitions_schema, serialization_mapper=serialization_mapper
)
def _validated_mapped_value(mapper, key):
if key in mapper:
key_mapper = mapper[key]
if isinstance(key_mapper, (FunctionCall,)):
logging.error(
f"{key} is mapped to a function in serialization mapper- "
"This is unsupported by code-to-schema conversion. "
"You will need to manually fix it."
)
elif key_mapper is DoNotSerialize or isinstance(key_mapper, Constant):
return key_mapper
elif not isinstance(key_mapper, (FunctionCall, str)):
raise TypeError("mapper must have a FunctionCall or a string")
return None
[docs]def structure_to_schema(structure, definitions_schema, serialization_mapper=None):
"""
Generate JSON schema from :class:`Structure`
`See working examples in tests. <https://github.com/loyada/typedpy/tree/master/tests/schema_mapping>`_
Arguments:
structure( subclass of :class:`Structure` ):
the class
definitions_schema(dict):
the json schema for all the definitions (typically under "#/definitions" in the schema.
If it is the first call, just use and empty dict.
Returns:
A tuple of 2. The fist is the schema of structure, the second is the schema
for the referenced definitions.
the The schema that the code maps to. It also updates
"""
# json schema draft4 does not support inheritance, so we don't need to worry about that
if not issubclass(structure, Structure):
raise TypeError("Expected a Structure subclass")
field_by_name = structure.get_all_fields_by_name()
required = getattr(structure, "_required", list(field_by_name.keys()))
additional_props = getattr(
structure, ADDITIONAL_PROPERTIES, TypedPyDefaults.additional_properties_default
)
mapper = aggregate_serialization_mappers(structure, serialization_mapper) or {}
if getattr(structure, "_additional_serialization") != getattr(
Structure, "_additional_serialization"
):
logging.warning(
"mapping to schema does not support _additional_serialization method. You"
" will have to edit it manually."
)
if (
len(field_by_name) == 1
and set(required) == set(field_by_name.keys())
and additional_props is False
):
_, value = first_in(field_by_name.items())
return (
convert_to_schema(value, definitions_schema),
definitions_schema,
)
else:
fields_schema = OrderedDict([("type", "object")])
fields_schema["properties"] = {}
properties = fields_schema["properties"]
_generate_schema_for_fields_internal(
definitions_schema, field_by_name, mapper, properties, required
)
fields_schema.update(
OrderedDict(
[
("required", sorted(required)),
("additionalProperties", additional_props),
]
)
)
return fields_schema, definitions_schema
def _generate_schema_for_fields_internal(
definitions_schema, field_by_name, mapper, properties, required
):
for key, field in field_by_name.items():
mapped_key = (
mapper[key] if key in mapper and isinstance(mapper[key], (str,)) else key
)
mapped_value = _validated_mapped_value(mapper, key)
if mapped_value is DoNotSerialize or isinstance(mapped_value, Constant):
if mapped_key in required:
required.pop(required.index(mapped_key))
else:
if key in required:
required[required.index(key)] = mapped_key
sub_mapper = mapper.get(f"{key}._mapper", {})
sub_schema = convert_to_schema(
field, definitions_schema, serialization_mapper=sub_mapper
)
default_raw = getattr(field, "_default", None)
if default_raw is not None:
default_val = default_raw() if callable(default_raw) else default_raw
if isinstance(default_val, enum.Enum):
default_val = default_val.name
sub_schema["default"] = default_val
if mapped_key not in required:
required.append(mapped_key)
properties[mapped_key] = sub_schema
type_name_to_field = {
"object": StructureReference,
"integer": Integer,
"number": Number,
"array": Array,
"string": String,
"boolean": Boolean,
}
multivals = {"allOf": AllOf, "anyOf": AnyOf, "oneOf": OneOf, "not": NotField}
@default_factories
def convert_to_field_code(schema, definitions, additional_fields=list):
"""
In case schema is None, should return None.
Should deal with a schema that is a dict, as well as one that is a list
"""
if schema is None:
return None
if isinstance(schema, list):
fields = [convert_to_field_code(s, definitions) for s in schema]
return f"[{', '.join(fields)}]"
if "$ref" in schema:
def_name = schema["$ref"][len("#/definitions/") :]
return def_name
return _convert_field_to_schema_code_internal(
additional_fields, definitions, schema
)
def _convert_field_to_schema_code_internal(additional_fields, definitions, schema):
if any(multival in schema for multival in multivals):
for k, the_class in multivals.items():
if k in schema:
cls = the_class
mapper = MultiFieldMapper
elif "enum" in schema:
cls = Enum
mapper = get_mapper(cls)
else:
object_type = schema.get("type", "object")
if object_type == "object":
if SCHEMA_PROPETIES in schema:
cls = StructureReference
else:
cls = Map
else:
for c in additional_fields:
custom_mapping = c.from_json_schema(schema)
if custom_mapping:
return custom_mapping
cls = type_name_to_field[schema.get("type", "object")]
mapper = get_mapper(cls)
params_list = mapper.get_paramlist_from_schema(schema, definitions)
_handle_schema_default_to_code(params_list, schema)
params_as_string = ", ".join([f"{name}={val}" for (name, val) in params_list])
return f"{cls.__name__}({params_as_string})"
def _handle_schema_default_to_code(params_list, schema):
if "default" in schema:
default_val = schema["default"]
if isinstance(default_val, (list, dict)):
default_val = f"lambda: {default_val}"
else:
default_val = wrap_val(default_val)
params_list.append(("default", default_val))
[docs]@default_factories
def schema_to_struct_code(
struct_name, schema, definitions_schema, additional_fields=list
):
"""
Generate code for the main class that maps to the given JSON schema.
The main struct_name can include references to structures defined in
definitions_schema, under "#/definitions/".
Arguments:
struct_name(str):
the name of the main :class:`Structure` to be created
schema(dict):
the json schema of the main Structure that need to be defined
definitions_schema(dict):
schema for definitions of objects that can be referred to in the main schema. If non exist,
just use an empty dict.
additional_fields(list):
additional Types of Fields with custom schema mapping that can appear in the schema. These
have to implement the class method from_json_schema(), which should return a string of the code
the Schema is mapping to.
Returns:
A string with the code of the class. This can either be executed directly,
using exec(), or written to a file.
The "description" property, if exists, is mapped to the docstring of the class.
If you write to a file, the higher level :func:`write_code_from_schema` is preferable.
Note: In case schema is None, should return None.
Deals with a schema that is a dict, as well as one that is a list
"""
body = [f"class {struct_name}(Structure):"]
body += (
[f' """\n {schema.get("description")}\n """\n']
if "description" in schema
else []
)
body += (
[" _additional_properties = False"]
if not schema.get("additionalProperties", True)
else []
)
required = (
schema.get("required", None)
if schema.get("type", "object") == "object"
else ["wrapped"]
)
the_type = schema.get("type", "object" if "properties" in schema else None)
if the_type == "object":
properties = schema.get("properties", {})
for name, sch in properties.items():
if "default" in sch and name in required:
required.remove(name)
body += [
f" {name}: {convert_to_field_code(sch, definitions_schema, additional_fields=additional_fields)}"
]
else:
body += [
f" wrapped = {convert_to_field_code(schema, definitions_schema, additional_fields=additional_fields)}"
]
body += ["", f" _required = {required}"] if required is not None else []
return "\n".join(body)
[docs]@default_factories
def schema_definitions_to_code(schema, additional_fields=list):
"""
Generate code for the classes in the definitions that maps to the given JSON schema.
`See working example in test_schema_to_code.py.
<https://github.com/loyada/typedpy/tree/master/tests/test_schema_to_code.py>`_
Arguments:
schema(dict):
the json schema of the various Structures that need to be defined
Returns:
A string with the code. This can either be executed directly, using exec(), or written to a file.
If you write to a file, the higher level :func:`write_code_from_schema` is preferable.
"""
code = []
for name, sch in schema.items():
code.append(
schema_to_struct_code(
name, sch, schema, additional_fields=additional_fields
)
)
return "\n\n\n".join(code)
[docs]@default_factories
def write_code_from_schema(
schema, definitions_schema, filename, class_name, additional_fields=list
):
"""
Generate code from schema and write it to a file.
Example:
.. code-block:: python
write_code_from_schema(
schema,
definitions,
"generated_sample.py",
"Poo",
additional_fields=[CustomField1, CustomField2]
)
Arguments:
schema(dict):
the json schema for the main structure
definitions_schema(dict):
the json schema for all the definitions (typically under "#/definitions" in the schema.
These can be referred to from the main schema
filename(str):
the file name for the output. Typically should be end with ".py".
class_name(str):
the main Structure name
additional_fields(list[Type[Field]]):
additional field classes with custom mapping
"""
supporting_classes = schema_definitions_to_code(
definitions_schema, additional_fields=additional_fields
)
structure_code = schema_to_struct_code(
class_name, schema, definitions_schema, additional_fields=additional_fields
)
with open(filename, "w", encoding="utf-8") as fout:
fout.write("from typedpy import *\n\n\n")
if definitions_schema:
fout.write(supporting_classes)
fout.write("\n\n# ********************\n\n\n")
fout.write(structure_code)
fout.write("\n")
class Mapper:
def __init__(self, value):
self.value = value
class StructureReferenceMapper(Mapper):
@staticmethod
def get_paramlist_from_schema(schema, definitions):
body = []
body += (
[(ADDITIONAL_PROPERTIES, False)]
if not schema.get(SCHEMA_ADDITIONAL_PROPERTIES, True)
else []
)
required = schema.get("required", None)
body += [("_required", required)] if required is not None else []
properties = schema.get("properties", {})
body += [
(k, convert_to_field_code(v, definitions)) for (k, v) in properties.items()
]
return body
def to_schema(self, definitions, serialization_mapper):
schema, _ = structure_to_schema(
getattr(self.value, "_newclass"), definitions, serialization_mapper
)
schema["type"] = "object"
return schema
class NumberMapper(Mapper):
@staticmethod
def get_paramlist_from_schema(schema, definitions):
params = {
"multiplesOf": schema.get("multiplesOf", None),
"minimum": schema.get("minimum", None),
"maximum": schema.get("maximum", None),
"exclusiveMaximum": schema.get("exclusiveMaximum", None),
}
return list((k, v) for k, v in params.items() if v is not None)
def to_schema(self, definitions, serialization_mapper):
def get_min(value):
if value.minimum is not None:
return value.minimum
if isinstance(value, NonNegative):
return 0
if isinstance(value, Positive):
return 1 if isinstance(value, Integer) else 0.000001
return None
def get_max(value):
if value.maximum is not None:
return value.maximum
if isinstance(value, NonPositive):
return 0
if isinstance(value, Negative):
return -1 if isinstance(value, Integer) else -0.000001
return None
value = self.value
params = {
"type": "number",
"multiplesOf": value.multiplesOf,
"minimum": get_min(value),
"maximum": get_max(value),
"exclusiveMaximum": value.exclusiveMaximum,
}
return {k: v for k, v in params.items() if v is not None}
class MapMapper(Mapper):
@staticmethod
def get_paramlist_from_schema(schema, definitions):
additional_properties = schema.get(SCHEMA_ADDITIONAL_PROPERTIES, None)
pattern_properties = schema.get(SCHEMA_PATTERN_PROPERTIES, None)
property_names = schema.get(SCHEMA_PATTERN_PROPERTIES, {})
if pattern_properties and (
len(pattern_properties) > 1 or additional_properties or property_names
):
raise NotImplementedError("Conversion for this map is unsupported")
if not any([additional_properties, property_names, pattern_properties]):
return []
key_type = (
convert_to_field_code({**property_names, "type": "string"}, globals())
if property_names
else "String()"
)
adjusted_key_type = (
f"String(pattern='{list(pattern_properties.keys())[0]}')"
if pattern_properties
else key_type
)
value_type = (
convert_to_field_code(additional_properties, definitions)
if additional_properties
else (
convert_to_schema(pattern_properties, definitions)
if additional_properties
else "Anything"
)
)
items = f"[{adjusted_key_type}, {value_type}]"
params = {
"items": items,
"maxItems": schema.get("maxItems", None),
"minItems": schema.get("minItems", None),
}
return list((k, v) for k, v in params.items() if v is not None)
def to_schema(self, definitions, serialization_mapper):
value = self.value
params = {
"type": "object",
}
if value.items:
keys, values = value.items
if not isinstance(keys, String):
raise TypeError("JSON supports only Strings as keys")
suffix = ""
if keys.maxLength or keys.minLength:
suffix = f"{{{keys.minLength or ''}, {keys.maxLength or ''}}}"
pattern_props = f"{keys.pattern or ''}{suffix}" or None
values_schema = convert_to_schema(
values, definitions, serialization_mapper=serialization_mapper
)
if pattern_props:
params[SCHEMA_PATTERN_PROPERTIES] = values_schema
elif values_schema:
params[SCHEMA_ADDITIONAL_PROPERTIES] = values_schema
params["maxItems"] = value.maxItems
params["minItems"] = value.minItems
return {k: v for k, v in params.items() if v is not None}
class IntegerMapper(NumberMapper):
def to_schema(self, definitions, serialization_mapper):
params = super().to_schema(definitions, serialization_mapper)
params.update({"type": "integer"})
return params
class BooleanMapper(Mapper):
@staticmethod
def get_paramlist_from_schema(schema, definitions):
return []
def to_schema(self, definitions, serialization_mapper):
params = {
"type": "boolean",
}
return params
class StringMapper(Mapper):
@staticmethod
def get_paramlist_from_schema(schema, definitions):
params = {
"minLength": schema.get("minLength", None),
"maxLength": schema.get("maxLength", None),
"pattern": wrap_val(schema.get("pattern", None)),
}
return list((k, v) for k, v in params.items() if v is not None)
def to_schema(self, definitions, serialization_mapper):
value = self.value
params = {
"type": "string",
"minLength": value.minLength,
"maxLength": value.maxLength,
"pattern": value.pattern,
}
return {k: v for k, v in params.items() if v is not None}
class DateStringMapper(Mapper):
def to_schema(self, definitions, serialization_mapper):
params = {
"type": "string",
"pattern": r"^([12]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01]))$",
}
return {k: v for k, v in params.items() if v is not None}
class ArrayMapper(Mapper):
@staticmethod
def get_paramlist_from_schema(schema, definitions):
items = schema.get("items", None)
params = {
"uniqueItems": schema.get("uniqueItems", None),
"additionalItems": schema.get("additionalItems", None),
"items": convert_to_field_code(items, definitions),
}
return list((k, v) for k, v in params.items() if v is not None)
def to_schema(self, definitions, serialization_mapper):
value = self.value
if isinstance(value, Tuple):
params = {
"type": "array",
"uniqueItems": value.uniqueItems,
"additionalItems": False,
"items": convert_to_schema(
value.items, definitions, serialization_mapper
),
}
elif isinstance(value, Set):
params = {
"type": "array",
"uniqueItems": True,
"maxItems": value.maxItems,
"minItems": value.minItems,
"items": convert_to_schema(
value.items, definitions, serialization_mapper
),
}
else:
params = {
"type": "array",
"uniqueItems": value.uniqueItems,
"additionalItems": value.additionalItems,
"maxItems": value.maxItems,
"minItems": value.minItems,
"items": convert_to_schema(
value.items, definitions, serialization_mapper
),
}
return {k: v for k, v in params.items() if v is not None}
class EnumMapper(Mapper):
@staticmethod
def get_paramlist_from_schema(schema, definitions):
params = {
"values": schema.get("enum", None),
}
return list(params.items())
def to_schema(self, definitions, serialization_mapper):
def adjust(val) -> Union[str, int, float]:
if isinstance(val, enum.Enum):
return (
val.value
if getattr(self.value, "serialization_by_value", False)
else val.name
)
if not isinstance(val, (int, str, float)):
raise TypeError("enum must be an enum, str, or number")
return val
values = [adjust(v) for v in self.value.values]
params = {"enum": values}
return {k: v for k, v in params.items() if v is not None}
class MultiFieldMapper:
@staticmethod
def get_paramlist_from_schema(schema, definitions):
items = list(schema.values())[0]
params = {"fields": convert_to_field_code(items, definitions)}
return list(params.items())
class AllOfMapper(Mapper):
def to_schema(self, definitions, serialization_mapper):
return {
"allOf": convert_to_schema(
self.value._fields, definitions, serialization_mapper
)
}
class OneOfMapper(Mapper):
def to_schema(self, definitions, serialization_mapper):
return {
"oneOf": convert_to_schema(
self.value._fields, definitions, serialization_mapper
)
}
class AnyOfMapper(Mapper):
def to_schema(self, definitions, serialization_mapper):
if (
len(self.value._fields) == 2
and self.value._fields[1].__class__ == NoneField
):
return convert_to_schema(
self.value._fields[0], definitions, serialization_mapper
)
return {
"anyOf": convert_to_schema(
self.value._fields, definitions, serialization_mapper
)
}
class NotFieldMapper(Mapper):
def to_schema(self, definitions, serialization_mapper):
return {
"not": convert_to_schema(
self.value._fields, definitions, serialization_mapper
)
}