Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package agent

import (
"encoding/json"
"fmt"
"time"

"github.com/argoproj-labs/argocd-agent/internal/auth"
"github.com/argoproj-labs/argocd-agent/internal/backend"
"github.com/argoproj-labs/argocd-agent/internal/checkpoint"
"github.com/argoproj-labs/argocd-agent/internal/event"
Expand Down Expand Up @@ -386,6 +388,13 @@ func (a *Agent) processIncomingResourceResyncEvent(ev *event.Event) error {
}

resyncHandler := resync.NewRequestHandler(dynClient, sendQ, a.emitter, a.resources, logCtx, manager.ManagerRoleAgent, a.namespace)
subject := &auth.AuthSubject{}
err = json.Unmarshal([]byte(a.remote.ClientID()), subject)
if err != nil {
return fmt.Errorf("failed to extract agent name from client ID: %w", err)
}

agentName := subject.ClientID

switch ev.Type() {
case event.SyncedResourceList:
Expand All @@ -398,7 +407,7 @@ func (a *Agent) processIncomingResourceResyncEvent(ev *event.Event) error {
return err
}

return resyncHandler.ProcessSyncedResourceListRequest(a.remote.ClientID(), req)
return resyncHandler.ProcessSyncedResourceListRequest(agentName, req)
case event.ResponseSyncedResource:
if a.mode != types.AgentModeManaged {
return fmt.Errorf("agent can only handle SyncedResource request in the managed mode")
Expand All @@ -409,7 +418,7 @@ func (a *Agent) processIncomingResourceResyncEvent(ev *event.Event) error {
return err
}

return resyncHandler.ProcessIncomingSyncedResource(a.context, req, a.remote.ClientID())
return resyncHandler.ProcessIncomingSyncedResource(a.context, req, agentName)
case event.EventRequestUpdate:
if a.mode != types.AgentModeAutonomous {
return fmt.Errorf("agent can only handle RequestUpdate in the autonomous mode")
Expand All @@ -420,13 +429,23 @@ func (a *Agent) processIncomingResourceResyncEvent(ev *event.Event) error {
return err
}

return resyncHandler.ProcessRequestUpdateEvent(a.context, a.remote.ClientID(), incoming)
// For autonomous agents, the principal stores AppProjects with a prefixed name (agent-name + "-" + project-name).
// When the principal sends a RequestUpdate, it uses the prefixed name. We need to strip the prefix
// before looking up the resource locally.
if incoming.Kind == "AppProject" {
prefix := agentName + "-"
if len(incoming.Name) > len(prefix) && incoming.Name[:len(prefix)] == prefix {
incoming.Name = incoming.Name[len(prefix):]
}
}

return resyncHandler.ProcessRequestUpdateEvent(a.context, agentName, incoming)
case event.EventRequestResourceResync:
if a.mode != types.AgentModeManaged {
return fmt.Errorf("agent can only handle ResourceResync request in the managed mode")
}

return resyncHandler.ProcessIncomingResourceResyncRequest(a.context, a.remote.ClientID())
return resyncHandler.ProcessIncomingResourceResyncRequest(a.context, agentName)
default:
return fmt.Errorf("invalid type of resource resync: %s", ev.Type())
}
Expand Down
13 changes: 12 additions & 1 deletion agent/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package agent

import (
"context"
"encoding/json"
"fmt"
"testing"

Expand All @@ -27,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"

"github.com/argoproj-labs/argocd-agent/internal/auth"
backend_mocks "github.com/argoproj-labs/argocd-agent/internal/backend/mocks"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/internal/manager"
Expand Down Expand Up @@ -1468,7 +1470,16 @@ func Test_processIncomingResourceResyncEvent(t *testing.T) {
a.namespace = "test"
a.context = context.Background()

err := a.queues.Create(a.remote.ClientID())
subject := &auth.AuthSubject{
ClientID: "test",
}
subjectJSON, err := json.Marshal(subject)
if err != nil {
t.Fatalf("Failed to marshal subject: %v", err)
}
a.remote.SetClientID(string(subjectJSON))
fmt.Println("a.remote.ClientID()", a.remote.ClientID())
err = a.queues.Create(a.remote.ClientID())
assert.Nil(t, err)
a.emitter = event.NewEventSource("test")

Expand Down
1 change: 1 addition & 0 deletions internal/resync/resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (r *RequestHandler) ProcessIncomingResourceResyncRequest(ctx context.Contex
r.log.Errorf("failed to send request update for resource %s: %v", resource.Name, err)
continue
}
r.log.WithField(logfields.Kind, resource.Kind).WithField(logfields.Name, resource.Name).Trace("Sent a request update event")
}

return nil
Expand Down
23 changes: 16 additions & 7 deletions principal/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,20 @@ func (s *Server) newAppProjectCallback(outbound *v1alpha1.AppProject) {
"appproject_name": outbound.Name,
})

s.resources.Add(outbound.Namespace, resources.NewResourceKeyFromAppProject(outbound))

// Check if this AppProject was created by an autonomous agent
if isResourceFromAutonomousAgent(outbound) {
// For autonomous agents, the agent name may be different from the namespace name.
// SourceNamespaces[0] contains the exact agent name.
if len(outbound.Spec.SourceNamespaces) > 0 {
agentName := outbound.Spec.SourceNamespaces[0]
s.resources.Add(agentName, resources.NewResourceKeyFromAppProject(outbound))
}
logCtx.Debugf("Discarding event, because the appProject is managed by an autonomous agent")
return
}

s.resources.Add(outbound.Namespace, resources.NewResourceKeyFromAppProject(outbound))

// Return early if no interested agent is connected
if !s.queues.HasQueuePair(outbound.Namespace) {
if err := s.queues.Create(outbound.Namespace); err != nil {
Expand Down Expand Up @@ -292,16 +298,19 @@ func (s *Server) deleteAppProjectCallback(outbound *v1alpha1.AppProject) {
logCtx.Trace("Deleted appProject is recreated")
return
}
}

s.resources.Remove(outbound.Namespace, resources.NewResourceKeyFromAppProject(outbound))

// Check if this AppProject was created by an autonomous agent by examining its name prefix
if isResourceFromAutonomousAgent(outbound) {
// For autonomous agents, the agent name may be different from the namespace name.
// SourceNamespaces[0] contains the exact agent name.
if len(outbound.Spec.SourceNamespaces) > 0 {
agentName := outbound.Spec.SourceNamespaces[0]
s.resources.Remove(agentName, resources.NewResourceKeyFromAppProject(outbound))
}
logCtx.Debugf("Discarding event, because the appProject is managed by an autonomous agent")
return
}

s.resources.Remove(outbound.Namespace, resources.NewResourceKeyFromAppProject(outbound))

if !s.queues.HasQueuePair(outbound.Namespace) {
if err := s.queues.Create(outbound.Namespace); err != nil {
logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace")
Expand Down
11 changes: 11 additions & 0 deletions principal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,17 @@ func (s *Server) processIncomingResourceResyncEvent(ctx context.Context, agentNa
return err
}

// For autonomous agents, the agent sends RequestUpdate with the local AppProject name (e.g., "sample"),
// but the principal stores it with a prefixed name (e.g., "agent-autonomous-sample").
// We need to add the prefix before looking it up locally.
if agentMode == types.AgentModeAutonomous && incoming.Kind == "AppProject" {
prefixedName, err := agentPrefixedProjectName(incoming.Name, agentName)
if err != nil {
return fmt.Errorf("could not prefix project name: %w", err)
}
incoming.Name = prefixedName
}

return resyncHandler.ProcessRequestUpdateEvent(ctx, agentName, incoming)
case event.EventRequestResourceResync.String():
if agentMode != types.AgentModeAutonomous {
Expand Down
Loading
Loading