|
| 1 | +# Copyright 2018-Present The CloudEvents Authors |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 4 | +# not use this file except in compliance with the License. You may obtain |
| 5 | +# a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 11 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 12 | +# License for the specific language governing permissions and limitations |
| 13 | +# under the License. |
| 14 | + |
| 15 | + |
| 16 | +import base64 |
| 17 | +import re |
| 18 | +from datetime import datetime |
| 19 | +from json import JSONEncoder, dumps, loads |
| 20 | +from typing import Any, Callable, Final, Optional, Pattern, Union |
| 21 | + |
| 22 | +from dateutil.parser import isoparse # type: ignore[import-untyped] |
| 23 | + |
| 24 | +from cloudevents.core.base import BaseCloudEvent |
| 25 | +from cloudevents.core.formats.base import Format |
| 26 | + |
| 27 | + |
| 28 | +class _JSONEncoderWithDatetime(JSONEncoder): |
| 29 | + """ |
| 30 | + Custom JSON encoder to handle datetime objects in the format required by the CloudEvents spec. |
| 31 | + """ |
| 32 | + |
| 33 | + def default(self, obj: Any) -> Any: |
| 34 | + if isinstance(obj, datetime): |
| 35 | + dt = obj.isoformat() |
| 36 | + # 'Z' denotes a UTC offset of 00:00 see |
| 37 | + # https://www.rfc-editor.org/rfc/rfc3339#section-2 |
| 38 | + if dt.endswith("+00:00"): |
| 39 | + dt = dt.removesuffix("+00:00") + "Z" |
| 40 | + return dt |
| 41 | + |
| 42 | + return super().default(obj) |
| 43 | + |
| 44 | + |
| 45 | +class JSONFormat(Format): |
| 46 | + CONTENT_TYPE: Final[str] = "application/cloudevents+json" |
| 47 | + JSON_CONTENT_TYPE_PATTERN: Pattern[str] = re.compile( |
| 48 | + r"^(application|text)/([a-zA-Z0-9\-\.]+\+)?json(;.*)?$" |
| 49 | + ) |
| 50 | + |
| 51 | + def read( |
| 52 | + self, |
| 53 | + event_factory: Callable[ |
| 54 | + [dict, Optional[Union[dict, str, bytes]]], BaseCloudEvent |
| 55 | + ], |
| 56 | + data: Union[str, bytes], |
| 57 | + ) -> BaseCloudEvent: |
| 58 | + """ |
| 59 | + Read a CloudEvent from a JSON formatted byte string. |
| 60 | +
|
| 61 | + :param event_factory: A factory function to create CloudEvent instances. |
| 62 | + :param data: The JSON formatted byte array. |
| 63 | + :return: The CloudEvent instance. |
| 64 | + """ |
| 65 | + decoded_data: str |
| 66 | + if isinstance(data, bytes): |
| 67 | + decoded_data = data.decode("utf-8") |
| 68 | + else: |
| 69 | + decoded_data = data |
| 70 | + |
| 71 | + event_attributes = loads(decoded_data) |
| 72 | + |
| 73 | + if "time" in event_attributes: |
| 74 | + event_attributes["time"] = isoparse(event_attributes["time"]) |
| 75 | + |
| 76 | + event_data: Union[dict, str, bytes, None] = event_attributes.pop("data", None) |
| 77 | + if event_data is None: |
| 78 | + event_data_base64 = event_attributes.pop("data_base64", None) |
| 79 | + if event_data_base64 is not None: |
| 80 | + event_data = base64.b64decode(event_data_base64) |
| 81 | + |
| 82 | + return event_factory(event_attributes, event_data) |
| 83 | + |
| 84 | + def write(self, event: BaseCloudEvent) -> bytes: |
| 85 | + """ |
| 86 | + Write a CloudEvent to a JSON formatted byte string. |
| 87 | +
|
| 88 | + :param event: The CloudEvent to write. |
| 89 | + :return: The CloudEvent as a JSON formatted byte array. |
| 90 | + """ |
| 91 | + event_data = event.get_data() |
| 92 | + event_dict: dict[str, Any] = dict(event.get_attributes()) |
| 93 | + |
| 94 | + if event_data is not None: |
| 95 | + if isinstance(event_data, (bytes, bytearray)): |
| 96 | + event_dict["data_base64"] = base64.b64encode(event_data).decode("utf-8") |
| 97 | + else: |
| 98 | + datacontenttype = event_dict.get("datacontenttype", "application/json") |
| 99 | + if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype): |
| 100 | + event_dict["data"] = event_data |
| 101 | + else: |
| 102 | + event_dict["data"] = str(event_data) |
| 103 | + |
| 104 | + return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8") |
0 commit comments