Skip to content

Commit 9ebff0c

Browse files
gongwn1tzolov
authored andcommitted
feat: implement resource update notifications (#264)
This enables servers to notify clients when specific resources are updated, allowing clients to react to individual resource changes rather than just list changes. - Add ResourcesUpdatedNotification schema with uri field - Add METHOD_NOTIFICATION_RESOURCES_UPDATED constant - Implement notifyResourcesUpdated() methods in McpAsyncServer and McpSyncServer - Add client-side support for resource update consumers and handlers - Client automatically reads updated resource content when notification received - Add tests for both async and sync server implementations Signed-off-by: Christian Tzolov <[email protected]>
1 parent be719f5 commit 9ebff0c

File tree

10 files changed

+146
-7
lines changed

10 files changed

+146
-7
lines changed

mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,18 @@ void testNotifyResourcesListChanged() {
194194
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
195195
}
196196

197+
@Test
198+
void testNotifyResourcesUpdated() {
199+
var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();
200+
201+
StepVerifier
202+
.create(mcpAsyncServer
203+
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
204+
.verifyComplete();
205+
206+
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
207+
}
208+
197209
@Test
198210
void testAddResource() {
199211
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())

mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,17 @@ void testNotifyResourcesListChanged() {
191191
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
192192
}
193193

194+
@Test
195+
void testNotifyResourcesUpdated() {
196+
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();
197+
198+
assertThatCode(() -> mcpSyncServer
199+
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
200+
.doesNotThrowAnyException();
201+
202+
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
203+
}
204+
194205
@Test
195206
void testAddResource() {
196207
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,18 @@ public class McpAsyncClient {
236236
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED,
237237
asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal));
238238

239+
// Resources Update Notification
240+
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumersFinal = new ArrayList<>();
241+
resourcesUpdateConsumersFinal
242+
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification)));
243+
244+
if (!Utils.isEmpty(features.resourcesUpdateConsumers())) {
245+
resourcesUpdateConsumersFinal.addAll(features.resourcesUpdateConsumers());
246+
}
247+
248+
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
249+
asyncResourcesUpdatedNotificationHandler(resourcesUpdateConsumersFinal));
250+
239251
// Prompts Change Notification
240252
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumersFinal = new ArrayList<>();
241253
promptsChangeConsumersFinal
@@ -867,6 +879,24 @@ private NotificationHandler asyncResourcesChangeNotificationHandler(
867879
.then());
868880
}
869881

882+
private NotificationHandler asyncResourcesUpdatedNotificationHandler(
883+
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers) {
884+
return params -> {
885+
McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification = transport.unmarshalFrom(params,
886+
new TypeReference<>() {
887+
});
888+
889+
return readResource(new McpSchema.ReadResourceRequest(resourcesUpdatedNotification.uri()))
890+
.flatMap(readResourceResult -> Flux.fromIterable(resourcesUpdateConsumers)
891+
.flatMap(consumer -> consumer.apply(readResourceResult.contents()))
892+
.onErrorResume(error -> {
893+
logger.error("Error handling resource update notification", error);
894+
return Mono.empty();
895+
})
896+
.then());
897+
};
898+
}
899+
870900
// --------------------------
871901
// Prompts
872902
// --------------------------

mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ class SyncSpec {
171171

172172
private final List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers = new ArrayList<>();
173173

174+
private final List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers = new ArrayList<>();
175+
174176
private final List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers = new ArrayList<>();
175177

176178
private final List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers = new ArrayList<>();
@@ -382,8 +384,8 @@ public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotificat
382384
*/
383385
public McpSyncClient build() {
384386
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
385-
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
386-
this.loggingConsumers, this.samplingHandler, this.elicitationHandler);
387+
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
388+
this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler, this.elicitationHandler);
387389

388390
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
389391

@@ -427,6 +429,8 @@ class AsyncSpec {
427429

428430
private final List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers = new ArrayList<>();
429431

432+
private final List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers = new ArrayList<>();
433+
430434
private final List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();
431435

432436
private final List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers = new ArrayList<>();
@@ -589,6 +593,23 @@ public AsyncSpec resourcesChangeConsumer(
589593
return this;
590594
}
591595

596+
/**
597+
* Adds a consumer to be notified when a specific resource is updated. This allows
598+
* the client to react to changes in individual resources, such as updates to
599+
* their content or metadata.
600+
* @param resourcesUpdateConsumer A consumer function that processes the updated
601+
* resource and returns a Mono indicating the completion of the processing. Must
602+
* not be null.
603+
* @return This builder instance for method chaining.
604+
* @throws IllegalArgumentException If the resourcesUpdateConsumer is null.
605+
*/
606+
public AsyncSpec resourcesUpdateConsumer(
607+
Function<List<McpSchema.ResourceContents>, Mono<Void>> resourcesUpdateConsumer) {
608+
Assert.notNull(resourcesUpdateConsumer, "Resources update consumer must not be null");
609+
this.resourcesUpdateConsumers.add(resourcesUpdateConsumer);
610+
return this;
611+
}
612+
592613
/**
593614
* Adds a consumer to be notified when the available prompts change. This allows
594615
* the client to react to changes in the server's prompt templates, such as new
@@ -641,8 +662,9 @@ public AsyncSpec loggingConsumers(
641662
public McpAsyncClient build() {
642663
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
643664
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
644-
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
645-
this.loggingConsumers, this.samplingHandler, this.elicitationHandler));
665+
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
666+
this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler,
667+
this.elicitationHandler));
646668
}
647669

648670
}

mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class McpClientFeatures {
6565
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
6666
Map<String, McpSchema.Root> roots, List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers,
6767
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
68+
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
6869
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
6970
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
7071
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
@@ -85,6 +86,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
8586
Map<String, McpSchema.Root> roots,
8687
List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers,
8788
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
89+
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
8890
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
8991
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
9092
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
@@ -101,6 +103,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
101103

102104
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
103105
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
106+
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
104107
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
105108
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
106109
this.samplingHandler = samplingHandler;
@@ -128,8 +131,13 @@ public static Async fromSync(Sync syncSpec) {
128131
.subscribeOn(Schedulers.boundedElastic()));
129132
}
130133

131-
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();
134+
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers = new ArrayList<>();
135+
for (Consumer<List<McpSchema.ResourceContents>> consumer : syncSpec.resourcesUpdateConsumers()) {
136+
resourcesUpdateConsumers.add(r -> Mono.<Void>fromRunnable(() -> consumer.accept(r))
137+
.subscribeOn(Schedulers.boundedElastic()));
138+
}
132139

140+
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();
133141
for (Consumer<List<McpSchema.Prompt>> consumer : syncSpec.promptsChangeConsumers()) {
134142
promptsChangeConsumers.add(p -> Mono.<Void>fromRunnable(() -> consumer.accept(p))
135143
.subscribeOn(Schedulers.boundedElastic()));
@@ -150,8 +158,8 @@ public static Async fromSync(Sync syncSpec) {
150158
.subscribeOn(Schedulers.boundedElastic());
151159

152160
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
153-
toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers, loggingConsumers,
154-
samplingHandler, elicitationHandler);
161+
toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers,
162+
loggingConsumers, samplingHandler, elicitationHandler);
155163
}
156164
}
157165

@@ -172,6 +180,7 @@ public static Async fromSync(Sync syncSpec) {
172180
public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
173181
Map<String, McpSchema.Root> roots, List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers,
174182
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers,
183+
List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers,
175184
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
176185
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
177186
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
@@ -184,6 +193,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
184193
* @param roots the roots.
185194
* @param toolsChangeConsumers the tools change consumers.
186195
* @param resourcesChangeConsumers the resources change consumers.
196+
* @param resourcesUpdateConsumers the resource update consumers.
187197
* @param promptsChangeConsumers the prompts change consumers.
188198
* @param loggingConsumers the logging consumers.
189199
* @param samplingHandler the sampling handler.
@@ -192,6 +202,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
192202
public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
193203
Map<String, McpSchema.Root> roots, List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers,
194204
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers,
205+
List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers,
195206
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
196207
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
197208
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
@@ -208,6 +219,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
208219

209220
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
210221
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
222+
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
211223
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
212224
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
213225
this.samplingHandler = samplingHandler;

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,15 @@ public Mono<Void> notifyResourcesListChanged() {
430430
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, null);
431431
}
432432

433+
/**
434+
* Notifies clients that the resources have updated.
435+
* @return A Mono that completes when all clients have been notified
436+
*/
437+
public Mono<Void> notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) {
438+
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
439+
resourcesUpdatedNotification);
440+
}
441+
433442
private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> resourcesListRequestHandler() {
434443
return (exchange, params) -> {
435444
var resourceList = this.resources.values()

mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ public void notifyResourcesListChanged() {
141141
this.asyncServer.notifyResourcesListChanged().block();
142142
}
143143

144+
/**
145+
* Notify clients that the resources have updated.
146+
*/
147+
public void notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) {
148+
this.asyncServer.notifyResourcesUpdated(resourcesUpdatedNotification).block();
149+
}
150+
144151
/**
145152
* Notify clients that the list of available prompts has changed.
146153
*/

mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ private McpSchema() {
7171

7272
public static final String METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED = "notifications/resources/list_changed";
7373

74+
public static final String METHOD_NOTIFICATION_RESOURCES_UPDATED = "notifications/resources/updated";
75+
7476
public static final String METHOD_RESOURCES_TEMPLATES_LIST = "resources/templates/list";
7577

7678
public static final String METHOD_RESOURCES_SUBSCRIBE = "resources/subscribe";
@@ -1314,6 +1316,17 @@ public record ProgressNotification(// @formatter:off
13141316
@JsonProperty("total") Double total) {
13151317
}// @formatter:on
13161318

1319+
/**
1320+
* The Model Context Protocol (MCP) provides a standardized way for servers to send
1321+
* resources update message to clients.
1322+
*
1323+
* @param uri The updated resource uri.
1324+
*/
1325+
@JsonIgnoreProperties(ignoreUnknown = true)
1326+
public record ResourcesUpdatedNotification(// @formatter:off
1327+
@JsonProperty("uri") String uri) {
1328+
}// @formatter:on
1329+
13171330
/**
13181331
* The Model Context Protocol (MCP) provides a standardized way for servers to send
13191332
* structured log messages to clients. Clients can control logging verbosity by

mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,18 @@ void testNotifyResourcesListChanged() {
193193
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
194194
}
195195

196+
@Test
197+
void testNotifyResourcesUpdated() {
198+
var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();
199+
200+
StepVerifier
201+
.create(mcpAsyncServer
202+
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
203+
.verifyComplete();
204+
205+
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
206+
}
207+
196208
@Test
197209
void testAddResource() {
198210
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())

mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,17 @@ void testNotifyResourcesListChanged() {
190190
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
191191
}
192192

193+
@Test
194+
void testNotifyResourcesUpdated() {
195+
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();
196+
197+
assertThatCode(() -> mcpSyncServer
198+
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
199+
.doesNotThrowAnyException();
200+
201+
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
202+
}
203+
193204
@Test
194205
void testAddResource() {
195206
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())

0 commit comments

Comments
 (0)