Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.

Commit 484a7f3

Browse files
feat: restore agents on demand (#949)
1 parent 60cf404 commit 484a7f3

File tree

4 files changed

+76
-102
lines changed

4 files changed

+76
-102
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ NOTE: all releases may include dependency updates, not specifically mentioned
88

99
- feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912)
1010

11+
## 2025-07-14 - Runtime v0.18.6
12+
13+
- feat: restore agents on demand [#949](https://github.com/hypermodeinc/modus/pull/949)
14+
1115
## 2025-07-12 - Runtime v0.18.5
1216

1317
- fix: sentry source context [#940](https://github.com/hypermodeinc/modus/pull/940)

runtime/actors/actorsystem.go

Lines changed: 21 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@ package actors
88
import (
99
"context"
1010
"fmt"
11-
"math/rand/v2"
11+
"sync"
1212
"time"
1313

14-
"github.com/hypermodeinc/modus/runtime/db"
1514
"github.com/hypermodeinc/modus/runtime/logger"
1615
"github.com/hypermodeinc/modus/runtime/messages"
1716
"github.com/hypermodeinc/modus/runtime/pluginmanager"
@@ -82,7 +81,13 @@ func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error
8281
}
8382

8483
// important: wait for the actor system to sync with the cluster before proceeding
85-
waitForClusterSync(ctx)
84+
if clusterEnabled() {
85+
select {
86+
case <-time.After(peerSyncInterval()):
87+
case <-ctx.Done():
88+
logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.")
89+
}
90+
}
8691

8792
return nil
8893
}
@@ -106,68 +111,9 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
106111
}
107112
}
108113

109-
// do this in a goroutine to avoid blocking the cluster engine startup
110-
go func() {
111-
if err := restoreAgentActors(ctx, plugin.Name()); err != nil {
112-
const msg = "Failed to restore agent actors."
113-
sentryutils.CaptureError(ctx, err, msg)
114-
logger.Error(ctx, err).Msg(msg)
115-
}
116-
}()
117-
118114
return nil
119115
}
120116

121-
// restoreAgentActors spawn actors for agents with state in the database, that are not already running
122-
func restoreAgentActors(ctx context.Context, pluginName string) error {
123-
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
124-
defer span.Finish()
125-
126-
logger.Debug(ctx).Msg("Restoring agent actors from database.")
127-
128-
// query the database for active agents
129-
agents, err := db.QueryActiveAgents(ctx)
130-
if err != nil {
131-
return fmt.Errorf("failed to query active agents from database: %w", err)
132-
}
133-
134-
// shuffle the agents to help distribute the load across the cluster when multiple nodes are starting simultaneously
135-
rand.Shuffle(len(agents), func(i, j int) {
136-
agents[i], agents[j] = agents[j], agents[i]
137-
})
138-
139-
// spawn actors for each agent that is not already running
140-
for _, agent := range agents {
141-
actorName := getActorName(agent.Id)
142-
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
143-
const msg = "Failed to check if agent actor exists."
144-
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id))
145-
logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg)
146-
} else if !exists {
147-
err := spawnActorForAgent(ctx, pluginName, agent.Id, agent.Name, false)
148-
if err != nil {
149-
const msg = "Failed to spawn actor for agent."
150-
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id))
151-
logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg)
152-
}
153-
}
154-
}
155-
156-
return nil
157-
}
158-
159-
// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its
160-
// list of actors with the remote nodes in the cluster. Cancels early if the context is done.
161-
func waitForClusterSync(ctx context.Context) {
162-
if clusterEnabled() {
163-
select {
164-
case <-time.After(peerSyncInterval()):
165-
case <-ctx.Done():
166-
logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.")
167-
}
168-
}
169-
}
170-
171117
func Shutdown(ctx context.Context) {
172118
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
173119
defer span.Finish()
@@ -209,29 +155,25 @@ func (sh *shutdownHook) Execute(ctx context.Context, actorSystem goakt.ActorSyst
209155

210156
// Suspend all local running agent actors first, which allows them to gracefully stop and persist their state.
211157
// In cluster mode, this will also allow the actor to resume on another node after this node shuts down.
158+
// We use goroutines and a wait group to do this concurrently.
159+
var wg sync.WaitGroup
212160
for _, pid := range actors {
213161
if actor, ok := pid.Actor().(*wasmAgentActor); ok && pid.IsRunning() {
214162
if actor.status == AgentStatusRunning {
215-
ctx := actor.augmentContext(ctx, pid)
216-
if err := actor.suspendAgent(ctx); err != nil {
217-
const msg = "Failed to suspend agent actor."
218-
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId))
219-
logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg(msg)
220-
}
221-
}
222-
}
223-
}
224-
225-
// Then shut down subscription actors. They will have received the suspend message already.
226-
for _, pid := range actors {
227-
if a, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
228-
if err := pid.Shutdown(ctx); err != nil {
229-
const msg = "Failed to shut down subscription actor."
230-
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", a.agentId))
231-
logger.Error(ctx, err).Str("agent_id", a.agentId).Msg(msg)
163+
wg.Add(1)
164+
go func() {
165+
defer wg.Done()
166+
ctx := actor.augmentContext(ctx, pid)
167+
if err := actor.suspendAgent(ctx); err != nil {
168+
const msg = "Failed to suspend agent actor."
169+
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId))
170+
logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg(msg)
171+
}
172+
}()
232173
}
233174
}
234175
}
176+
wg.Wait()
235177

236178
// Then allow the actor system to continue with its shutdown process.
237179
return nil

runtime/actors/agents.go

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,6 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
9898
}),
9999
)
100100

101-
// Important: Wait for the actor system to sync with the cluster before proceeding.
102-
// This ensures consistency across the cluster, so we don't accidentally spawn the same actor multiple times.
103-
// GoAkt does not resolve such inconsistencies automatically, so we need to handle this manually.
104-
// A short sync time should not be noticeable by the user.
105-
waitForClusterSync(ctx)
106-
107101
return err
108102
}
109103

@@ -208,26 +202,58 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data
208202
}
209203

210204
var err error
211-
var res proto.Message
212-
if timeout == 0 {
213-
err = tell(ctx, actorName, msg)
214-
} else {
215-
res, err = ask(ctx, actorName, msg, time.Duration(timeout))
216-
}
205+
const maxRetries = 3
206+
for attempt := 1; attempt <= maxRetries; attempt++ {
207+
208+
var res proto.Message
209+
if timeout == 0 {
210+
err = tell(ctx, actorName, msg)
211+
} else {
212+
res, err = ask(ctx, actorName, msg, time.Duration(timeout))
213+
}
217214

218-
if errors.Is(err, goakt.ErrActorNotFound) {
219-
return newAgentMessageErrorResponse("agent not found"), nil
220-
} else if err != nil {
221-
return nil, fmt.Errorf("error sending message to agent: %w", err)
222-
}
215+
if err == nil {
216+
if res == nil {
217+
return newAgentMessageDataResponse(nil), nil
218+
} else if response, ok := res.(*messages.AgentResponse); ok {
219+
return newAgentMessageDataResponse(response.Data), nil
220+
} else {
221+
return nil, fmt.Errorf("unexpected agent response type: %T", res)
222+
}
223+
}
223224

224-
if res == nil {
225-
return newAgentMessageDataResponse(nil), nil
226-
} else if response, ok := res.(*messages.AgentResponse); ok {
227-
return newAgentMessageDataResponse(response.Data), nil
228-
} else {
229-
return nil, fmt.Errorf("unexpected agent response type: %T", res)
225+
if errors.Is(err, goakt.ErrActorNotFound) {
226+
state, err := db.GetAgentState(ctx, agentId)
227+
if err != nil {
228+
return nil, fmt.Errorf("error getting agent state for %s: %w", agentId, err)
229+
}
230+
if state == nil {
231+
return newAgentMessageErrorResponse("agent not found"), nil
232+
}
233+
234+
switch AgentStatus(state.Status) {
235+
case AgentStatusStopping, AgentStatusTerminated:
236+
return newAgentMessageErrorResponse("agent is no longer available"), nil
237+
}
238+
239+
// Restart the agent actor locally if it is not running.
240+
var pluginName string
241+
if plugin, ok := plugins.GetPluginFromContext(ctx); !ok {
242+
return nil, fmt.Errorf("no plugin found in context")
243+
} else {
244+
pluginName = plugin.Name()
245+
}
246+
agentName := state.Name
247+
if err := spawnActorForAgent(ctx, pluginName, agentId, agentName, false); err != nil {
248+
return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err)
249+
}
250+
251+
// Retry sending the message to the agent actor.
252+
continue
253+
}
230254
}
255+
256+
return nil, fmt.Errorf("error sending message to agent: %w", err)
231257
}
232258

233259
func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error {

runtime/actors/subscriber.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ func SubscribeForAgentEvents(ctx context.Context, agentId string, update func(da
6868
return fmt.Errorf("failed to subscribe to topic: %w", err)
6969
}
7070

71+
logger.Debug(ctx).Msgf("Subscribed to topic %s with subscription actor %s", topic, subActor.Name())
72+
7173
// When the context is done, we will unsubscribe and stop the subscription actor.
7274
// For example, the GraphQL subscription is closed or the client disconnects.
7375
go func() {

0 commit comments

Comments
 (0)