Skip to content

Feature - Aggregate event stream inspection #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ For actual configuration, please consult the setup instructions that will be pro
only the keys set in the `axoniq.console.dlq-diagnostics-whitelist` will be shown.
* `full` - all the message payloads, aggregate identifier, and the diagnostics data will be visible.


* `axoniq.console.dlq-diagnostics-whitelist` - a comma-separated list of messages that will be shown in the DLQ
diagnostics.


* `axoniq.console.domain-event-access-mode` – change the visibility and access level of Domain Events in the system,
default: `none`. Several options are available:

* `none` – no payloads are visible, and aggregate loading is disabled.

* `preview_payload_only` – payloads of events are visible via the API or UI, but snapshot loading is disabled.

* `load_domain_state_only` – event payloads are not shown, but domain state loading is enabled for the domain entity.

* `full` – full access: event payloads are visible, and aggregate loading is supported.

## Data sent to AxonIQ

AxonIQ Console is an [AxonIQ](https://axoniq.io) SaaS product. Your application will periodically, or upon request, send
Expand Down
2 changes: 1 addition & 1 deletion console-framework-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.axoniq.console</groupId>
<artifactId>console-framework-client-parent</artifactId>
<version>1.9.3-SNAPSHOT</version>
<version>1.10.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
* Copyright (c) 2022-2025. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.axoniq.console.framework;
package io.axoniq.console.framework.api;

public enum AxoniqConsoleDlqMode {
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.axoniq.console.framework.api;

public enum DomainEventAccessMode {
/**
* Full access: payload is visible and loading domain state is supported.
*/
FULL,

/**
* Payload is hidden (e.g., masked), but loading domain state is still supported.
*/
LOAD_DOMAIN_STATE_ONLY,

/**
* Payload is visible, but loading domain state is not supported.
*/
PREVIEW_PAYLOAD_ONLY,

/**
* No access: payload is hidden and loading domain state is not supported.
*/
NONE
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ object Routes {
const val REPORT = "application-info-report"
}

object Enity {
const val DOMAIN_EVENTS = "domain-events"
const val ENTITY_STATE_AT_SEQUENCE = "entity-state-at-sequence"
}

object MessageFlow {
const val STATS = "message-flow-stats"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.axoniq.console.framework.api

import java.time.Instant

data class DomainEventsResult(
val entityId: String,
val entityType: String,
val domainEvents: List<DomainEvent>,
val page: Int,
val pageSize: Int,
val totalCount: Long,
)

data class DomainEvent(
val sequenceNumber: Long,
val timestamp: Instant,
val payloadType: String,
val payload: String?
)

data class DomainEventsQuery(
val entityId: String,
val page: Int = 0,
val pageSize: Int = 10,
)

data class EntityStateResult(
val type: String,
val entityId: String,
val maxSequenceNumber: Long = 0,
val state: String,
)

data class EntityStateAtSequenceQuery(
val type: String,
val entityId: String,
val maxSequenceNumber: Long = 0,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
* Copyright (c) 2022-2025. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,13 +85,13 @@ data class ConsoleClientIdentifier(
)

data class SetupPayload(
val features: SupportedFeatures? = SupportedFeatures(),
val commandBus: CommandBusInformation,
val queryBus: QueryBusInformation,
val eventStore: EventStoreInformation,
val processors: List<EventProcessorInformation>,
val versions: Versions,
val upcasters: List<String>,
val features: SupportedFeatures = SupportedFeatures(),
)

data class SupportedFeatures(
Expand All @@ -102,7 +102,11 @@ data class SupportedFeatures(
/* Whether the client supports pause/resume of reports.*/
val pauseReports: Boolean? = false,
/* Whether the client supports thread dumps.*/
val threadDump: Boolean? = false
val threadDump: Boolean? = false,
/* Whether the client supports DLQ insights. Can be FULL, LIMITED, MASKED, or NONE (default).*/
val deadLetterQueuesInsights: AxoniqConsoleDlqMode = AxoniqConsoleDlqMode.NONE,
/* Whether the client supports domain events insights. Can be FULL, LOAD_DOMAIN_STATE_ONLY, PREVIEW_PAYLOAD_ONLY, or NONE (default).*/
val domainEventsInsights: DomainEventAccessMode = DomainEventAccessMode.NONE,
)

data class Versions(
Expand Down
2 changes: 1 addition & 1 deletion console-framework-client-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.axoniq.console</groupId>
<artifactId>console-framework-client-parent</artifactId>
<version>1.9.3-SNAPSHOT</version>
<version>1.10.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class AxoniqConsoleAutoConfiguration {
.initialDelay(properties.initialDelay)
.disableSpanFactoryInConfiguration()
.managementMaxThreadPoolSize(properties.maxConcurrentManagementTasks)
.domainEventAccessMode(properties.domainEventAccessMode)
properties.dlqDiagnosticsWhitelist.forEach { builder.addDlqDiagnosticsWhitelistKey(it) }
return builder.build()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
* Copyright (c) 2022-2025. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,8 @@

package io.axoniq.console.framework.starter;

import io.axoniq.console.framework.AxoniqConsoleDlqMode;
import io.axoniq.console.framework.api.AxoniqConsoleDlqMode;
import io.axoniq.console.framework.api.DomainEventAccessMode;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.ArrayList;
Expand Down Expand Up @@ -54,6 +55,12 @@ public class AxoniqConsoleSpringProperties {
* the list as the keys to filter on.
*/
private List<String> dlqDiagnosticsWhitelist = new ArrayList<>();
/**
* The access mode for Domain Events. Defaults to {@code NONE}, meaning no event payload is visible and loading
* for aggregate reconstruction is disabled. If payload inspection is required, consider {@code PAYLOAD_ONLY}.
* For full functionality, use {@code FULL}, which enables both payload visibility and aggregate snapshot loading.
*/
private DomainEventAccessMode domainEventAccessMode = DomainEventAccessMode.NONE;
/**
* Whether the connection should use SSL/TLs. Defaults to {@code true}.
*/
Expand Down Expand Up @@ -117,6 +124,14 @@ public void setDlqDiagnosticsWhitelist(List<String> dlqDiagnosticsWhitelist) {
this.dlqDiagnosticsWhitelist = dlqDiagnosticsWhitelist;
}

public DomainEventAccessMode getDomainEventAccessMode() {
return domainEventAccessMode;
}

public void setDomainEventAccessMode(DomainEventAccessMode domainEventAccessMode) {
this.domainEventAccessMode = domainEventAccessMode;
}

public boolean isSecure() {
return secure;
}
Expand Down
2 changes: 1 addition & 1 deletion console-framework-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.axoniq.console</groupId>
<artifactId>console-framework-client-parent</artifactId>
<version>1.9.3-SNAPSHOT</version>
<version>1.10.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
* Copyright (c) 2022-2025. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,17 @@

package io.axoniq.console.framework;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.axoniq.console.framework.api.AxoniqConsoleDlqMode;
import io.axoniq.console.framework.api.DomainEventAccessMode;
import io.axoniq.console.framework.application.DomainEventStreamProvider;
import io.axoniq.console.framework.application.ApplicationMetricRegistry;
import io.axoniq.console.framework.application.ApplicationMetricReporter;
import io.axoniq.console.framework.application.ApplicationReportCreator;
import io.axoniq.console.framework.application.ApplicationThreadDumpProvider;
import io.axoniq.console.framework.application.RSocketDomainEntityDataResponder;
import io.axoniq.console.framework.application.RSocketThreadDumpResponder;
import io.axoniq.console.framework.client.AxoniqConsoleRSocketClient;
import io.axoniq.console.framework.client.ClientSettingsService;
Expand Down Expand Up @@ -83,13 +90,15 @@ public class AxoniqConsoleConfigurerModule implements ConfigurerModule {
private final Long initialDelay;
private final AxoniqConsoleDlqMode dlqMode;
private final List<String> dlqDiagnosticsWhitelist;
private final DomainEventAccessMode domainEventAccessMode;
private final ScheduledExecutorService reportingTaskExecutor;
private final ExecutorService managementTaskExecutor;
private final boolean configureSpanFactory;
private final SpanMatcherPredicateMap spanMatcherPredicateMap;
private final EventScheduler eventScheduler;
private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
private final String instanceName;
private final ObjectMapper objectMapper;

/**
* Creates the {@link AxoniqConsoleConfigurerModule} with the given {@code builder}.
Expand All @@ -107,11 +116,13 @@ protected AxoniqConsoleConfigurerModule(Builder builder) {
this.initialDelay = builder.initialDelay;
this.dlqMode = builder.dlqMode;
this.dlqDiagnosticsWhitelist = builder.dlqDiagnosticsWhitelist;
this.domainEventAccessMode = builder.domainEventAccessMode;
this.reportingTaskExecutor = builder.reportingTaskExecutor;
this.managementTaskExecutor = builder.managementTaskExecutor;
this.configureSpanFactory = !builder.disableSpanFactoryInConfiguration;
this.spanMatcherPredicateMap = builder.spanMatcherPredicateMap;
this.eventScheduler = builder.eventScheduler;
this.objectMapper = builder.objectMapper;
}

/**
Expand Down Expand Up @@ -152,7 +163,11 @@ public void configureModule(@NotNull Configurer configurer) {
)
)
.registerComponent(SetupPayloadCreator.class,
SetupPayloadCreator::new
c -> new SetupPayloadCreator(
c,
dlqMode,
domainEventAccessMode
)
)
.registerComponent(EventProcessorManager.class,
c -> new EventProcessorManager(
Expand Down Expand Up @@ -223,6 +238,12 @@ public void configureModule(@NotNull Configurer configurer) {
.registerComponent(ApplicationThreadDumpProvider.class,
c -> new ApplicationThreadDumpProvider()
)
.registerComponent(DomainEventStreamProvider.class,
c -> new DomainEventStreamProvider(
configurer.buildConfiguration(),
objectMapper
)
)
.registerComponent(RSocketDlqResponder.class,
c -> new RSocketDlqResponder(
c.getComponent(DeadLetterManager.class),
Expand All @@ -233,6 +254,13 @@ public void configureModule(@NotNull Configurer configurer) {
c.getComponent(ApplicationThreadDumpProvider.class),
c.getComponent(RSocketHandlerRegistrar.class)
))
.registerComponent(RSocketDomainEntityDataResponder.class,
c -> new RSocketDomainEntityDataResponder(
c.getComponent(DomainEventStreamProvider.class),
c.getComponent(RSocketHandlerRegistrar.class),
domainEventAccessMode,
c.eventSerializer()
))
.eventProcessing()
.registerDefaultHandlerInterceptor((
c, name) -> new AxoniqConsoleProcessorInterceptor(
Expand All @@ -259,6 +287,7 @@ public void configureModule(@NotNull Configurer configurer) {
c.getComponent(RSocketDlqResponder.class);
c.getComponent(HandlerMetricsRegistry.class);
c.getComponent(RSocketThreadDumpResponder.class);
c.getComponent(RSocketDomainEntityDataResponder.class);
});

configurer.onStart(() -> {
Expand Down Expand Up @@ -300,6 +329,7 @@ public static class Builder {
private String nodeId = randomNodeId();
private AxoniqConsoleDlqMode dlqMode = AxoniqConsoleDlqMode.NONE;
private final List<String> dlqDiagnosticsWhitelist = new ArrayList<>();
private DomainEventAccessMode domainEventAccessMode = DomainEventAccessMode.NONE;
private Long initialDelay = 0L;
private boolean disableSpanFactoryInConfiguration = false;
private final SpanMatcherPredicateMap spanMatcherPredicateMap = getSpanMatcherPredicateMap();
Expand All @@ -311,6 +341,9 @@ public static class Builder {
private Integer managementMaxThreadPoolSize = 5;
private EventScheduler eventScheduler;

private ObjectMapper objectMapper = new ObjectMapper().findAndRegisterModules()
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);

/**
* Constructor to instantiate a {@link Builder} based on the fields contained in the
* {@link AxoniqConsoleConfigurerModule.Builder}. Requires the {@code environmentId}, {@code accessToken} and
Expand Down Expand Up @@ -405,6 +438,18 @@ public Builder addDlqDiagnosticsWhitelistKey(String key) {
return this;
}

/**
* The mode of domain event access. Defaults to {@link DomainEventAccessMode#NONE}, which means
* that no domain event payload is visible and aggregate reconstruction is not supported.
*
* @param domainEventAccessMode The access mode to set for domain events
* @return The builder for fluent interfacing
*/
public Builder domainEventAccessMode(DomainEventAccessMode domainEventAccessMode) {
BuilderUtils.assertNonNull(domainEventAccessMode, "Domain event access mode may not be null");
this.domainEventAccessMode = domainEventAccessMode;
return this;
}

/**
* The initial delay before attempting to establish a connection. Defaults to {@code 0}.
Expand Down Expand Up @@ -524,6 +569,19 @@ public Builder eventScheduler(EventScheduler eventScheduler) {
return this;
}

/**
* Set the object mapper to be used for serialization and deserialization of domain events.
* Defaults to a new {@link ObjectMapper} with all modules registered and field visibility set to any.
*
* @param objectMapper the object mapper to use
* @return The builder for fluent interfacing
*/
public Builder objectMapper(ObjectMapper objectMapper) {
BuilderUtils.assertNonNull(objectMapper, "Object mapper must be non-null");
this.objectMapper = objectMapper;
return this;
}

/**
* Builds the {@link AxoniqConsoleConfigurerModule} based on the fields set in this {@link Builder}.
*
Expand Down
Loading
Loading