Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,11 +663,7 @@ func (sc *syncContext) removeHookFinalizer(task *syncTask) error {
updateErr := sc.updateResource(task)
if apierrors.IsConflict(updateErr) {
sc.log.WithValues("task", task).V(1).Info("Retrying hook finalizer removal due to conflict on update")
resIf, err := sc.getResourceIf(task, "get")
if err != nil {
return fmt.Errorf("failed to get resource interface: %w", err)
}
liveObj, err := resIf.Get(context.TODO(), task.liveObj.GetName(), metav1.GetOptions{})
liveObj, err := sc.getResource(task)
if apierrors.IsNotFound(err) {
sc.log.WithValues("task", task).V(1).Info("Resource is already deleted")
return nil
Expand All @@ -687,6 +683,19 @@ func (sc *syncContext) removeHookFinalizer(task *syncTask) error {
})
}

func (sc *syncContext) getResource(task *syncTask) (*unstructured.Unstructured, error) {
sc.log.WithValues("task", task).V(1).Info("Getting resource")
resIf, err := sc.getResourceIf(task, "get")
if err != nil {
return nil, err
}
liveObj, err := resIf.Get(context.TODO(), task.name(), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get resource: %w", err)
}
return liveObj, nil
}

func (sc *syncContext) updateResource(task *syncTask) error {
sc.log.WithValues("task", task).V(1).Info("Updating resource")
resIf, err := sc.getResourceIf(task, "update")
Expand Down Expand Up @@ -1367,6 +1376,31 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
createTasks = append(createTasks, task)
}
}

// remove finalizers from previous sync on existing hooks to make sure the operation is idempotent
{
ss := newStateSync(state)
existingHooks := tasks.Filter(func(t *syncTask) bool { return t.isHook() && t.pending() && t.liveObj != nil })
for _, task := range existingHooks {
t := task
ss.Go(func(state runState) runState {
logCtx := sc.log.WithValues("dryRun", dryRun, "task", t)
logCtx.V(1).Info("Removing finalizers")
if !dryRun {
if err := sc.removeHookFinalizer(t); err != nil {
state = failed
sc.setResourceResult(t, t.syncStatus, common.OperationError, fmt.Sprintf("failed to remove hook finalizer: %v", err))
}
}
return state
})
}
state = ss.Wait()
}
if state != successful {
return state
}

// prune first
{
if !sc.pruneConfirmed {
Expand Down Expand Up @@ -1418,15 +1452,19 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
for _, task := range hooksPendingDeletion {
t := task
ss.Go(func(state runState) runState {
sc.log.WithValues("dryRun", dryRun, "task", t).V(1).Info("Deleting")
log := sc.log.WithValues("dryRun", dryRun, "task", t).V(1)
log.Info("Deleting")
if !dryRun {
err := sc.deleteResource(t)
if err != nil {
// it is possible to get a race condition here, such that the resource does not exist when
// delete is requested, we treat this as a nop
// delete is requested, we treat this as a nopand remove the liveObj
if !apierrors.IsNotFound(err) {
state = failed
sc.setResourceResult(t, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
sc.setResourceResult(t, t.syncStatus, common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
} else {
log.Info("Resource not found, treating as no-op and removing liveObj")
t.liveObj = nil
}
} else {
// if there is anything that needs deleting, we are at best now in pending and
Expand Down
119 changes: 107 additions & 12 deletions pkg/sync/sync_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ func newTestSyncCtx(getResourceFunc *func(ctx context.Context, config *rest.Conf
&metav1.APIResourceList{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Kind: "Pod", Group: "", Version: "v1", Namespaced: true, Verbs: standardVerbs},
{Kind: "Service", Group: "", Version: "v1", Namespaced: true, Verbs: standardVerbs},
{Kind: "Namespace", Group: "", Version: "v1", Namespaced: false, Verbs: standardVerbs},
{Name: "pods", Kind: "Pod", Group: "", Version: "v1", Namespaced: true, Verbs: standardVerbs},
{Name: "services", Kind: "Service", Group: "", Version: "v1", Namespaced: true, Verbs: standardVerbs},
{Name: "namespaces", Kind: "Namespace", Group: "", Version: "v1", Namespaced: false, Verbs: standardVerbs},
},
},
&metav1.APIResourceList{
GroupVersion: "apps/v1",
APIResources: []metav1.APIResource{
{Kind: "Deployment", Group: "apps", Version: "v1", Namespaced: true, Verbs: standardVerbs},
{Name: "deployments", Kind: "Deployment", Group: "apps", Version: "v1", Namespaced: true, Verbs: standardVerbs},
},
})
sc := syncContext{
Expand Down Expand Up @@ -854,6 +854,39 @@ func withReplaceAndServerSideApplyAnnotations(un *unstructured.Unstructured) *un
return un
}

func TestSync_HookWithReplaceAndBeforeHookCreation_AlreadyDeleted(t *testing.T) {
// This test a race condition when Delete is called on an already deleted object
// LiveObj is set, but then the resource is deleted asynchronously in kubernetes
syncCtx := newTestSyncCtx(nil)

target := withReplaceAnnotation(testingutils.NewPod())
target.SetNamespace(testingutils.FakeArgoCDNamespace)
target = testingutils.Annotate(target, synccommon.AnnotationKeyHookDeletePolicy, string(synccommon.HookDeletePolicyBeforeHookCreation))
target = testingutils.Annotate(target, synccommon.AnnotationKeyHook, string(synccommon.SyncPhasePreSync))
live := target.DeepCopy()

syncCtx.resources = groupResources(ReconciliationResult{
Live: []*unstructured.Unstructured{live},
Target: []*unstructured.Unstructured{target},
})
syncCtx.hooks = []*unstructured.Unstructured{live}

client := fake.NewSimpleDynamicClient(runtime.NewScheme())
deleted := false
client.PrependReactor("delete", "pods", func(_ testcore.Action) (bool, runtime.Object, error) {
deleted = true
// simulate the race conditions where liveObj was not null, but is now deleted in k8s
return true, nil, apierrors.NewNotFound(corev1.Resource("pods"), live.GetName())
})
syncCtx.dynamicIf = client

syncCtx.Sync()

resourceOps, _ := syncCtx.resourceOps.(*kubetest.MockResourceOps)
assert.Equal(t, "create", resourceOps.GetLastResourceCommand(kube.GetResourceKey(target)))
assert.True(t, deleted)
}

func TestSync_ServerSideApply(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -1285,22 +1318,84 @@ func TestSyncFailureHookWithFailedSync(t *testing.T) {
}

func TestBeforeHookCreation(t *testing.T) {
finalizerRemoved := false
syncCtx := newTestSyncCtx(nil)
hook := testingutils.Annotate(testingutils.Annotate(testingutils.NewPod(), synccommon.AnnotationKeyHook, "Sync"), synccommon.AnnotationKeyHookDeletePolicy, "BeforeHookCreation")
hook.SetNamespace(testingutils.FakeArgoCDNamespace)
hookObj := testingutils.Annotate(testingutils.Annotate(testingutils.NewPod(), synccommon.AnnotationKeyHook, "Sync"), synccommon.AnnotationKeyHookDeletePolicy, "BeforeHookCreation")
hookObj.SetFinalizers([]string{hook.HookFinalizer})
hookObj.SetNamespace(testingutils.FakeArgoCDNamespace)
syncCtx.resources = groupResources(ReconciliationResult{
Live: []*unstructured.Unstructured{hook},
Live: []*unstructured.Unstructured{hookObj},
Target: []*unstructured.Unstructured{nil},
})
syncCtx.hooks = []*unstructured.Unstructured{hook}
syncCtx.dynamicIf = fake.NewSimpleDynamicClient(runtime.NewScheme())
syncCtx.hooks = []*unstructured.Unstructured{hookObj}
client := fake.NewSimpleDynamicClient(runtime.NewScheme(), hookObj)
client.PrependReactor("update", "pods", func(_ testcore.Action) (bool, runtime.Object, error) {
finalizerRemoved = true
return false, nil, nil
})
syncCtx.dynamicIf = client

// First sync will delete the existing hook
syncCtx.Sync()
phase, _, _ := syncCtx.GetState()
assert.Equal(t, synccommon.OperationRunning, phase)
assert.True(t, finalizerRemoved)

_, _, resources := syncCtx.GetState()
// Second sync will create the hook
syncCtx.Sync()
phase, message, resources := syncCtx.GetState()
assert.Equal(t, synccommon.OperationRunning, phase)
assert.Len(t, resources, 1)
assert.Empty(t, resources[0].Message)
assert.Equal(t, "waiting for completion of hook /Pod/my-pod", syncCtx.message)
assert.Equal(t, synccommon.OperationRunning, resources[0].HookPhase)
assert.Equal(t, "waiting for completion of hook /Pod/my-pod", message)
}

func TestSync_ExistingHooksWithFinalizer(t *testing.T) {
newHook := func(name string, hookType synccommon.HookType, deletePolicy synccommon.HookDeletePolicy) *unstructured.Unstructured {
obj := testingutils.NewPod()
obj.SetName(name)
obj.SetNamespace(testingutils.FakeArgoCDNamespace)
testingutils.Annotate(obj, synccommon.AnnotationKeyHook, string(hookType))
testingutils.Annotate(obj, synccommon.AnnotationKeyHookDeletePolicy, string(deletePolicy))
obj.SetFinalizers([]string{hook.HookFinalizer})
return obj
}

hook1 := newHook("existing-hook-1", synccommon.HookTypePreSync, synccommon.HookDeletePolicyBeforeHookCreation)
hook2 := newHook("existing-hook-2", synccommon.HookTypePreSync, synccommon.HookDeletePolicyHookFailed)
hook3 := newHook("existing-hook-3", synccommon.HookTypePreSync, synccommon.HookDeletePolicyHookSucceeded)

syncCtx := newTestSyncCtx(nil)
fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme(), hook1, hook2, hook3)
syncCtx.dynamicIf = fakeDynamicClient
updatedCount := 0
fakeDynamicClient.PrependReactor("update", "*", func(_ testcore.Action) (handled bool, ret runtime.Object, err error) {
// Removing the finalizers
updatedCount++
return false, nil, nil
})
deletedCount := 0
fakeDynamicClient.PrependReactor("delete", "*", func(_ testcore.Action) (handled bool, ret runtime.Object, err error) {
// because of HookDeletePolicyBeforeHookCreation
deletedCount++
return false, nil, nil
})
syncCtx.resources = groupResources(ReconciliationResult{
Live: []*unstructured.Unstructured{hook1, hook2, hook3},
Target: []*unstructured.Unstructured{nil, nil, nil},
})
syncCtx.hooks = []*unstructured.Unstructured{hook1, hook2, hook3}

syncCtx.Sync()
phase, _, _ := syncCtx.GetState()

assert.Equal(t, synccommon.OperationRunning, phase)
assert.Equal(t, 3, updatedCount)
assert.Equal(t, 1, deletedCount)

_, err := syncCtx.getResource(&syncTask{liveObj: hook1})
require.Error(t, err, "Expected resource to be deleted")
assert.True(t, apierrors.IsNotFound(err))
}

func TestRunSyncFailHooksFailed(t *testing.T) {
Expand Down