Skip to content

Commit

Permalink
Merge pull request #572 from karlkfi/karl-status-watcher
Browse files Browse the repository at this point in the history
feat: replace StatusPoller w/ StatusWatcher
  • Loading branch information
karlkfi authored May 12, 2022
2 parents 02d2092 + c469493 commit 4831b66
Show file tree
Hide file tree
Showing 50 changed files with 3,251 additions and 204 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ test-e2e-focus: "$(MYGOBIN)/ginkgo" "$(MYGOBIN)/kind"

.PHONY: test-stress
test-stress: "$(MYGOBIN)/ginkgo" "$(MYGOBIN)/kind"
kind delete cluster --name=cli-utils-e2e && kind create cluster --name=cli-utils-e2e --wait 5m
kind delete cluster --name=cli-utils-e2e && kind create cluster --name=cli-utils-e2e --wait 5m \
--config=./test/stress/kind-cluster.yaml
kubectl wait nodes --for=condition=ready --all --timeout=5m
"$(MYGOBIN)/ginkgo" -v ./test/stress/... -- -v 3

.PHONY: vet
Expand Down
4 changes: 0 additions & 4 deletions cmd/apply/cmdapply.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func GetRunner(factory cmdutil.Factory, invFactory inventory.ClientFactory,

cmd.Flags().StringVar(&r.output, "output", printers.DefaultPrinter(),
fmt.Sprintf("Output format, must be one of %s", strings.Join(printers.SupportedPrinters(), ",")))
cmd.Flags().DurationVar(&r.period, "poll-period", 2*time.Second,
"Polling period for resource statuses.")
cmd.Flags().DurationVar(&r.reconcileTimeout, "reconcile-timeout", time.Duration(0),
"Timeout threshold for waiting for all resources to reach the Current status.")
cmd.Flags().BoolVar(&r.noPrune, "no-prune", r.noPrune,
Expand Down Expand Up @@ -81,7 +79,6 @@ type Runner struct {

serverSideOptions common.ServerSideOptions
output string
period time.Duration
reconcileTimeout time.Duration
noPrune bool
prunePropagationPolicy string
Expand Down Expand Up @@ -156,7 +153,6 @@ func (r *Runner) RunE(cmd *cobra.Command, args []string) error {

ch := a.Run(ctx, inv, objs, apply.ApplierOptions{
ServerSideOptions: r.serverSideOptions,
PollInterval: r.period,
ReconcileTimeout: r.reconcileTimeout,
// If we are not waiting for status, tell the applier to not
// emit the events.
Expand Down
21 changes: 8 additions & 13 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/mutator"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/apply/solver"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/validation"
)

const defaultPollInterval = 2 * time.Second

// Applier performs the step of applying a set of resources into a cluster,
// conditionally waits for all of them to be fully reconciled and finally
// performs prune to clean up any resources that has been deleted.
Expand All @@ -44,7 +42,7 @@ const defaultPollInterval = 2 * time.Second
// cluster, different sets of tasks might be needed.
type Applier struct {
pruner *prune.Pruner
statusPoller poller.Poller
statusWatcher watcher.StatusWatcher
invClient inventory.Client
client dynamic.Interface
openAPIGetter discovery.OpenAPISchemaInterface
Expand Down Expand Up @@ -236,10 +234,14 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller)
statusWatcher := a.statusWatcher
// Disable watcher for dry runs
if opts.DryRunStrategy.ClientOrServerDryRun() {
statusWatcher = watcher.BlindStatusWatcher{}
}
runner := taskrunner.NewTaskStatusRunner(allIds, statusWatcher)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
EmitStatusEvents: options.EmitStatusEvents,
})
if err != nil {
Expand All @@ -259,10 +261,6 @@ type ApplierOptions struct {
// how long to wait.
ReconcileTimeout time.Duration

// PollInterval defines how often we should poll for the status
// of resources.
PollInterval time.Duration

// EmitStatusEvents defines whether status events should be
// emitted on the eventChannel to the caller.
EmitStatusEvents bool
Expand Down Expand Up @@ -295,9 +293,6 @@ type ApplierOptions struct {
// setDefaults set the options to the default values if they
// have not been provided.
func setDefaults(o *ApplierOptions) {
if o.PollInterval == 0 {
o.PollInterval = defaultPollInterval
}
if o.PrunePropagationPolicy == "" {
o.PrunePropagationPolicy = metav1.DeletePropagationBackground
}
Expand Down
21 changes: 7 additions & 14 deletions pkg/apply/applier_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
)

type ApplierBuilder struct {
Expand All @@ -31,7 +28,7 @@ type ApplierBuilder struct {
mapper meta.RESTMapper
restConfig *rest.Config
unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)
statusPoller poller.Poller
statusWatcher watcher.StatusWatcher
}

// NewApplierBuilder returns a new ApplierBuilder.
Expand All @@ -52,7 +49,7 @@ func (b *ApplierBuilder) Build() (*Applier, error) {
Client: bx.client,
Mapper: bx.mapper,
},
statusPoller: bx.statusPoller,
statusWatcher: bx.statusWatcher,
invClient: bx.invClient,
client: bx.client,
openAPIGetter: bx.discoClient,
Expand Down Expand Up @@ -109,12 +106,8 @@ func (b *ApplierBuilder) finalize() (*ApplierBuilder, error) {
}
bx.unstructuredClientForMapping = bx.factory.UnstructuredClientForMapping
}
if bx.statusPoller == nil {
c, err := client.New(bx.restConfig, client.Options{Scheme: scheme.Scheme, Mapper: bx.mapper})
if err != nil {
return nil, fmt.Errorf("error creating client: %v", err)
}
bx.statusPoller = polling.NewStatusPoller(c, bx.mapper, polling.Options{})
if bx.statusWatcher == nil {
bx.statusWatcher = watcher.NewDefaultStatusWatcher(bx.client, bx.mapper)
}
return &bx, nil
}
Expand Down Expand Up @@ -154,7 +147,7 @@ func (b *ApplierBuilder) WithUnstructuredClientForMapping(unstructuredClientForM
return b
}

func (b *ApplierBuilder) WithStatusPoller(statusPoller poller.Poller) *ApplierBuilder {
b.statusPoller = statusPoller
func (b *ApplierBuilder) WithStatusWatcher(statusWatcher watcher.StatusWatcher) *ApplierBuilder {
b.statusWatcher = statusWatcher
return b
}
19 changes: 10 additions & 9 deletions pkg/apply/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/inventory"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/multierror"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/validation"
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestApplier(t *testing.T) {
clusterObjs object.UnstructuredSet
// options input to applier.Run
options ApplierOptions
// fake input events from the status poller
// fake input events from the statusWatcher
statusEvents []pollevent.Event
// expected output status events (async)
expectedStatusEvents []testutil.ExpEvent
Expand Down Expand Up @@ -1401,7 +1402,7 @@ func TestApplier(t *testing.T) {

for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
poller := newFakePoller(tc.statusEvents)
statusWatcher := newFakeWatcher(tc.statusEvents)

// Only feed valid objects into the TestApplier.
// Invalid objects should not generate API requests.
Expand All @@ -1418,7 +1419,7 @@ func TestApplier(t *testing.T) {
tc.invInfo,
validObjs,
tc.clusterObjs,
poller,
statusWatcher,
)

// Context for Applier.Run
Expand Down Expand Up @@ -1463,7 +1464,7 @@ func TestApplier(t *testing.T) {
e.ActionGroupEvent.Action == event.PruneAction {
once.Do(func() {
// start events
poller.Start()
statusWatcher.Start()
})
}
}
Expand Down Expand Up @@ -1519,7 +1520,7 @@ func TestApplierCancel(t *testing.T) {
runTimeout time.Duration
// timeout for the test
testTimeout time.Duration
// fake input events from the status poller
// fake input events from the statusWatcher
statusEvents []pollevent.Event
// expected output status events (async)
expectedStatusEvents []testutil.ExpEvent
Expand Down Expand Up @@ -1854,13 +1855,13 @@ func TestApplierCancel(t *testing.T) {

for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
poller := newFakePoller(tc.statusEvents)
statusWatcher := newFakeWatcher(tc.statusEvents)

applier := newTestApplier(t,
tc.invInfo,
tc.resources,
tc.clusterObjs,
poller,
statusWatcher,
)

// Context for Applier.Run
Expand Down Expand Up @@ -1902,7 +1903,7 @@ func TestApplierCancel(t *testing.T) {
e.ActionGroupEvent.Action == event.PruneAction {
once.Do(func() {
// start events
poller.Start()
statusWatcher.Start()
})
}
}
Expand Down Expand Up @@ -2046,7 +2047,7 @@ func TestReadAndPrepareObjects(t *testing.T) {
tc.resources,
tc.clusterObjs,
// no events needed for prepareObjects
newFakePoller([]pollevent.Event{}),
watcher.BlindStatusWatcher{},
)

applyObjs, pruneObjs, err := applier.prepareObjects(tc.invInfo.toWrapped(), tc.resources, ApplierOptions{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/apply/cache/resource_cache_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func (rc *ResourceCacheMap) Get(id object.ObjMetadata) ResourceStatus {
defer rc.mu.RUnlock()

obj, found := rc.cache[id]
if klog.V(4).Enabled() {
if klog.V(6).Enabled() {
if found {
klog.Infof("resource cache hit: %s", id)
klog.V(6).Infof("resource cache hit: %s", id)
} else {
klog.Infof("resource cache miss: %s", id)
klog.V(6).Infof("resource cache miss: %s", id)
}
}
if !found {
Expand Down
23 changes: 12 additions & 11 deletions pkg/apply/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import (
"k8s.io/klog/v2"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/jsonpath"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)

Expand Down Expand Up @@ -74,7 +73,7 @@ func newTestApplier(
invInfo inventoryInfo,
resources object.UnstructuredSet,
clusterObjs object.UnstructuredSet,
statusPoller poller.Poller,
statusWatcher watcher.StatusWatcher,
) *Applier {
tf := newTestFactory(t, invInfo, resources, clusterObjs)
defer tf.Cleanup()
Expand All @@ -88,7 +87,7 @@ func newTestApplier(
applier, err := NewApplierBuilder().
WithFactory(tf).
WithInventoryClient(invClient).
WithStatusPoller(statusPoller).
WithStatusWatcher(statusWatcher).
Build()
require.NoError(t, err)

Expand All @@ -103,7 +102,7 @@ func newTestDestroyer(
t *testing.T,
invInfo inventoryInfo,
clusterObjs object.UnstructuredSet,
statusPoller poller.Poller,
statusWatcher watcher.StatusWatcher,
) *Destroyer {
tf := newTestFactory(t, invInfo, object.UnstructuredSet{}, clusterObjs)
defer tf.Cleanup()
Expand All @@ -112,7 +111,7 @@ func newTestDestroyer(

destroyer, err := NewDestroyer(tf, invClient)
require.NoError(t, err)
destroyer.StatusPoller = statusPoller
destroyer.statusWatcher = statusWatcher

return destroyer
}
Expand Down Expand Up @@ -345,27 +344,29 @@ func (n *nsHandler) handle(t *testing.T, req *http.Request) (*http.Response, boo
return nil, false, nil
}

type fakePoller struct {
type fakeWatcher struct {
start chan struct{}
events []pollevent.Event
}

func newFakePoller(statusEvents []pollevent.Event) *fakePoller {
return &fakePoller{
func newFakeWatcher(statusEvents []pollevent.Event) *fakeWatcher {
return &fakeWatcher{
events: statusEvents,
start: make(chan struct{}),
}
}

// Start events being sent on the status channel
func (f *fakePoller) Start() {
func (f *fakeWatcher) Start() {
close(f.start)
}

func (f *fakePoller) Poll(ctx context.Context, _ object.ObjMetadataSet, _ polling.PollOptions) <-chan pollevent.Event {
func (f *fakeWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ watcher.Options) <-chan pollevent.Event {
eventChannel := make(chan pollevent.Event)
go func() {
defer close(eventChannel)
// send sync event immediately
eventChannel <- pollevent.Event{Type: pollevent.SyncEvent}
// wait until started to send the events
<-f.start
for _, f := range f.events {
Expand Down
Loading

0 comments on commit 4831b66

Please sign in to comment.