-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial implementation of schema and fields (#32)
- Loading branch information
1 parent
f678413
commit 3b9224b
Showing
20 changed files
with
795 additions
and
341 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
from datetime import datetime | ||
from datetime import timezone | ||
from functools import wraps | ||
|
||
from dateutil.parser import parse as dtparse | ||
|
||
from .ondict import ONDict | ||
|
||
|
||
def checktype(f): | ||
@wraps(f) | ||
def deco(cls, value): | ||
if not isinstance(value, cls.ftype): | ||
raise TypeError(f"{value} is not {cls.ftype}") | ||
return f(cls, value) | ||
|
||
return deco | ||
|
||
|
||
class Nullable: | ||
default = None | ||
|
||
@classmethod | ||
def from_obj(cls, value): | ||
if value is None or value == "null": | ||
return None | ||
return super().from_obj(value) | ||
|
||
@classmethod | ||
def to_str(cls, value): | ||
if value is None: | ||
return "null" | ||
return super().to_str(value) | ||
|
||
|
||
class Field: | ||
ftype = None | ||
default = None | ||
|
||
def __init__(self, value=None, doc=None, **kwargs): | ||
super().__init__(**kwargs) | ||
self.value = self.default if value is None else self.from_obj(value) | ||
self.doc = doc | ||
|
||
@classmethod | ||
def from_obj(cls, value): | ||
return cls.ftype(value) | ||
|
||
@classmethod | ||
@checktype | ||
def to_str(cls, value): | ||
return str(value) | ||
|
||
|
||
class Bool(Field): | ||
ftype = bool | ||
default = False | ||
|
||
@classmethod | ||
@checktype | ||
def to_str(cls, value): | ||
return "true" if value else "false" | ||
|
||
|
||
class Datetime(Field): | ||
ftype = datetime | ||
default = datetime.fromtimestamp(0, timezone.utc) | ||
|
||
@classmethod | ||
def from_obj(cls, value): | ||
if isinstance(value, datetime): | ||
pass | ||
elif isinstance(value, str): | ||
value = dtparse(value) | ||
elif isinstance(value, (float, int)): | ||
value = datetime.fromtimestamp(value, timezone.utc) | ||
else: | ||
raise ValueError(f"invalid value for datetime: {value!r}") | ||
return value | ||
|
||
@classmethod | ||
@checktype | ||
def to_str(cls, value): | ||
return value.isoformat() | ||
|
||
|
||
class Float(Field): | ||
ftype = float | ||
default = 0.0 | ||
|
||
|
||
class Int(Field): | ||
ftype = int | ||
default = 0 | ||
|
||
|
||
class Str(Field): | ||
ftype = str | ||
default = "" | ||
|
||
|
||
# fmt: off | ||
class NullableBool(Nullable, Bool): pass | ||
class NullableDatetime(Nullable, Datetime): pass | ||
class NullableFloat(Nullable, Float): pass | ||
class NullableInt(Nullable, Int): pass | ||
class NullableStr(Nullable, Str): pass | ||
# fmt: on | ||
|
||
|
||
def extract_values(d: ONDict) -> ONDict: | ||
"""Make a ONDict with only default values and no other type info.""" | ||
valueonly = ONDict() | ||
valueonly._create = True | ||
for key in d.allkeys(): | ||
v = d[key] | ||
if isinstance(v, Field): | ||
v = v.value | ||
valueonly[key] = v | ||
return valueonly |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,64 @@ | ||
from collections.abc import MutableMapping | ||
from json import dump as _dump | ||
from json import load as _load | ||
from json.decoder import JSONDecodeError | ||
from logging import getLogger | ||
|
||
from .. import fields | ||
from ..ondict import ONDict | ||
from ..typing import IO | ||
from ..typing import Any | ||
from ..typing import Optional | ||
from ..typing import Union | ||
from .utils import escape_dot | ||
|
||
def dump(content, f): | ||
return _dump(content, f) | ||
log = getLogger(__name__) | ||
|
||
|
||
def load(f): | ||
def dump(content: ONDict, f: IO, schema: Optional[ONDict] = None): | ||
schema = schema or {} | ||
con = ONDict() | ||
con._create = True | ||
for key in list(content.allkeys()): | ||
con[key] = _dumpobj(content[key], schema.get(key)) | ||
_dump(con.asdict(), f) | ||
|
||
|
||
def _dumpobj(value, field) -> Any: | ||
if isinstance(field, fields.Field): | ||
if isinstance(field, (fields.Bool, fields.Float, fields.Int, fields.Str)): | ||
pass | ||
elif isinstance(field, (fields.Datetime,)): | ||
value = field.to_str(value) | ||
else: | ||
value = field.to_str(value) | ||
return value | ||
|
||
|
||
def load(f: IO, schema: Optional[ONDict] = None) -> ONDict: | ||
try: | ||
content = _load(f) | ||
except JSONDecodeError: | ||
log.exception("Load error") | ||
content = {} | ||
return content | ||
|
||
def _walk(d, schema): | ||
if not isinstance(d, MutableMapping): | ||
return _loadobj(schema, d) | ||
for key in list(d.keys()): | ||
ekey = escape_dot(key) | ||
d[key] = _walk(d[key], schema.get(ekey, {})) | ||
if ekey != key: | ||
d[ekey] = d[key] | ||
del d[key] | ||
return d | ||
|
||
_walk(content, schema or {}) | ||
|
||
return ONDict(content) | ||
|
||
|
||
def _loadobj(field: Union[fields.Field, Any], value: Any) -> Any: | ||
if isinstance(field, (fields.Field)): | ||
value = field.from_obj(value) | ||
return value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.