diff --git a/src/CloudNative.CloudEvents/CloudEventFormatters.cs b/src/CloudNative.CloudEvents/CloudEventFormatters.cs
new file mode 100644
index 0000000..c13e230
--- /dev/null
+++ b/src/CloudNative.CloudEvents/CloudEventFormatters.cs
@@ -0,0 +1,205 @@
+// Copyright 2025 Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+using CloudNative.CloudEvents.Core;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Net.Mime;
+using System.Threading.Tasks;
+
+namespace CloudNative.CloudEvents;
+
+///
+/// Extension methods for .
+///
+public static class CloudEventFormatters
+{
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static CloudEventFormatter WithDefaultDecodingExtensions(this CloudEventFormatter formatter, IEnumerable extensionAttributes) =>
+ new DefaultExtensionsFormatter(formatter, extensionAttributes);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static CloudEventFormatter WithValidation(this CloudEventFormatter formatter, Action validationAction) =>
+ WithValidation(formatter, validationAction, validationAction);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static CloudEventFormatter WithValidation(this CloudEventFormatter formatter, Action? encodingValidationAction, Action? decodingValidationAction) =>
+ new ValidatingFormatter(formatter, encodingValidationAction, decodingValidationAction);
+
+ ///
+ /// Delegating formatter which applies additional validation before each encoding method, and/or after each decoding method.
+ ///
+ private class ValidatingFormatter : DelegatingFormatter
+ {
+ private readonly Action? encodingValidationAction;
+ private readonly Action? decodingValidationAction;
+
+ internal ValidatingFormatter(CloudEventFormatter formatter, Action? encodingValidationAction, Action? decodingValidationAction) : base(formatter)
+ {
+ this.encodingValidationAction = encodingValidationAction;
+ this.decodingValidationAction = decodingValidationAction;
+ }
+
+ public override IReadOnlyList DecodeBatchModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes)
+ {
+ var result = base.DecodeBatchModeMessage(body, contentType, extensionAttributes);
+ foreach (var evt in result)
+ {
+ decodingValidationAction?.Invoke(evt);
+ }
+ return result;
+ }
+
+ public override IReadOnlyList DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes)
+ {
+ var result = base.DecodeBatchModeMessage(body, contentType, extensionAttributes);
+ foreach (var evt in result)
+ {
+ decodingValidationAction?.Invoke(evt);
+ }
+ return result;
+ }
+
+ public override async Task> DecodeBatchModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes)
+ {
+ var result = await base.DecodeBatchModeMessageAsync(body, contentType, extensionAttributes).ConfigureAwait(false);
+ foreach (var evt in result)
+ {
+ decodingValidationAction?.Invoke(evt);
+ }
+ return result;
+ }
+
+ public override void DecodeBinaryModeEventData(ReadOnlyMemory body, CloudEvent cloudEvent)
+ {
+ base.DecodeBinaryModeEventData(body, cloudEvent);
+ decodingValidationAction?.Invoke(cloudEvent);
+ }
+
+ public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes)
+ {
+ var evt = base.DecodeStructuredModeMessage(body, contentType, extensionAttributes);
+ decodingValidationAction?.Invoke(evt);
+ return evt;
+ }
+
+ public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable? extensionAttributes)
+ {
+ var evt = base.DecodeStructuredModeMessage(messageBody, contentType, extensionAttributes);
+ decodingValidationAction?.Invoke(evt);
+ return evt;
+ }
+
+ public override async Task DecodeStructuredModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes)
+ {
+ var evt = await base.DecodeStructuredModeMessageAsync(body, contentType, extensionAttributes).ConfigureAwait(false);
+ decodingValidationAction?.Invoke(evt);
+ return evt;
+ }
+
+ public override ReadOnlyMemory EncodeBatchModeMessage(IEnumerable cloudEvents, out ContentType contentType)
+ {
+ // This approach ensures that we only evaluate the original sequence once, without having to materialize it.
+ // It does mean we could end up encoding some before aborting later (and therefore wasting effort) though.
+ var validating = cloudEvents.Select(evt => { encodingValidationAction?.Invoke(evt); return evt; });
+ return base.EncodeBatchModeMessage(validating, out contentType);
+ }
+
+ public override ReadOnlyMemory EncodeBinaryModeEventData(CloudEvent cloudEvent)
+ {
+ encodingValidationAction?.Invoke(cloudEvent);
+ return base.EncodeBinaryModeEventData(cloudEvent);
+ }
+
+ public override ReadOnlyMemory EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
+ {
+ encodingValidationAction?.Invoke(cloudEvent);
+ return base.EncodeStructuredModeMessage(cloudEvent, out contentType);
+ }
+ }
+
+ ///
+ /// Delegating formatter which applies default extension attributes on all decoding method calls.
+ ///
+ private class DefaultExtensionsFormatter : DelegatingFormatter
+ {
+ private readonly IReadOnlyList defaultExtensionAttributes;
+
+ internal DefaultExtensionsFormatter(CloudEventFormatter formatter, IEnumerable extensionAttributes) : base(formatter)
+ {
+ defaultExtensionAttributes = Validation.CheckNotNull(extensionAttributes, nameof(extensionAttributes)).ToList().AsReadOnly();
+ foreach (var attribute in defaultExtensionAttributes)
+ {
+ if (!attribute.IsExtension)
+ {
+ throw new ArgumentException($"The {nameof(CloudEventAttribute.IsExtension)} must be true for all default extension attributes", nameof(extensionAttributes));
+ }
+ }
+ }
+
+ public override IReadOnlyList DecodeBatchModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ base.DecodeBatchModeMessage(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes));
+
+ public override IReadOnlyList DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ base.DecodeBatchModeMessage(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes));
+
+ public override Task> DecodeBatchModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ base.DecodeBatchModeMessageAsync(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes));
+
+ public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ base.DecodeStructuredModeMessage(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes));
+
+ public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ base.DecodeStructuredModeMessage(messageBody, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes));
+
+ public override Task DecodeStructuredModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ base.DecodeStructuredModeMessageAsync(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes));
+
+ private IEnumerable ComputeEffectiveExtensionAttributes(IEnumerable? additionalExtensions)
+ {
+ if (additionalExtensions is null)
+ {
+ return defaultExtensionAttributes;
+ }
+ var result = new List();
+
+ foreach (var additional in additionalExtensions)
+ {
+ var match = defaultExtensionAttributes.FirstOrDefault(def => def.Name == additional.Name);
+ if (match is not null)
+ {
+ if (match != additional)
+ {
+ // TODO: Improve this message
+ throw new ArgumentException($"The extension attribute {match.Name} is already provided as a default, using a different object");
+ }
+ }
+ else
+ {
+ result.Add(additional);
+ }
+ }
+ result.AddRange(defaultExtensionAttributes);
+ return result;
+ }
+ }
+}
diff --git a/src/CloudNative.CloudEvents/DelegatingFormatter.cs b/src/CloudNative.CloudEvents/DelegatingFormatter.cs
new file mode 100644
index 0000000..c444a14
--- /dev/null
+++ b/src/CloudNative.CloudEvents/DelegatingFormatter.cs
@@ -0,0 +1,151 @@
+// Copyright 2025 Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+using CloudNative.CloudEvents.Core;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.Mime;
+using System.Threading.Tasks;
+
+namespace CloudNative.CloudEvents;
+
+// TODO:
+// - Naming:
+// - Is the namespace appropriate? We have very few types in this root namespace
+// - Delegating or Decorating?
+// - Fill in the "CloudEvent" part, e.g. DelegatingCloudEventFormatter?
+// - Expose the original formatter, perhaps in a protected way?
+// - All documentation
+// - We can't call original.InferContentType, because it's protected. Is that an issue? (Users can always override GetOrInferContentType instead.)
+// - Are we happy with this being public? With the extension methods being public, this class doesn't have to be.
+// - Are we happy with the extension methods being extension methods?
+
+///
+///
+///
+public abstract class DelegatingFormatter : CloudEventFormatter
+{
+ ///
+ /// The formatter to delegate to.
+ ///
+ private readonly CloudEventFormatter original;
+
+ ///
+ ///
+ ///
+ ///
+ public DelegatingFormatter(CloudEventFormatter original)
+ {
+ this.original = Validation.CheckNotNull(original, nameof(original));
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ original.DecodeStructuredModeMessage(body, contentType, extensionAttributes);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ original.DecodeStructuredModeMessage(messageBody, contentType, extensionAttributes);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override Task DecodeStructuredModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ original.DecodeStructuredModeMessageAsync(body, contentType, extensionAttributes);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override ReadOnlyMemory EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) =>
+ original.EncodeStructuredModeMessage(cloudEvent, out contentType);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override void DecodeBinaryModeEventData(ReadOnlyMemory body, CloudEvent cloudEvent) =>
+ original.DecodeBinaryModeEventData(body, cloudEvent);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override ReadOnlyMemory EncodeBinaryModeEventData(CloudEvent cloudEvent) =>
+ original.EncodeBinaryModeEventData(cloudEvent);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override IReadOnlyList DecodeBatchModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ original.DecodeBatchModeMessage(body, contentType, extensionAttributes);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override IReadOnlyList DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ original.DecodeBatchModeMessage(body, contentType, extensionAttributes);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override Task> DecodeBatchModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) =>
+ original.DecodeBatchModeMessageAsync(body, contentType, extensionAttributes);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override ReadOnlyMemory EncodeBatchModeMessage(IEnumerable cloudEvents, out ContentType contentType) =>
+ original.EncodeBatchModeMessage(cloudEvents, out contentType);
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ public override string? GetOrInferDataContentType(CloudEvent cloudEvent) =>
+ original.GetOrInferDataContentType(cloudEvent);
+}