Skip to content

Commit

Permalink
Extend Filter interface with Trigger() and use it for pods and nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Jul 13, 2016
1 parent 7f7ef08 commit 1d9bc58
Show file tree
Hide file tree
Showing 46 changed files with 511 additions and 77 deletions.
2 changes: 1 addition & 1 deletion examples/apiserver/rest/reststorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) *RE
// Usually you should reuse your RESTCreateStrategy.
strategy := &NotNamespaceScoped{}
storageInterface := storageDecorator(
s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc)
s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
store := &registry.Store{
NewFunc: func() runtime.Object { return &testgroup.TestType{} },
// NewListFunc returns an object capable of storing results of an etcd list.
Expand Down
10 changes: 9 additions & 1 deletion federation/registry/cluster/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

type REST struct {
Expand All @@ -49,7 +50,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {

newListFunc := func() runtime.Object { return &federation.ClusterList{} }
storageInterface := opts.Decorator(
opts.Storage, 100, &federation.Cluster{}, prefix, cluster.Strategy, newListFunc)
opts.Storage,
100,
&federation.Cluster{},
prefix,
cluster.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &federation.Cluster{} },
Expand Down
11 changes: 10 additions & 1 deletion pkg/registry/certificates/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// REST implements a RESTStorage for CertificateSigningRequest against etcd
Expand All @@ -39,7 +40,15 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *ApprovalREST) {
prefix := "/certificatesigningrequests"

newListFunc := func() runtime.Object { return &certificates.CertificateSigningRequestList{} }
storageInterface := opts.Decorator(opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests), &certificates.CertificateSigningRequest{}, prefix, csrregistry.Strategy, newListFunc)
storageInterface := opts.Decorator(
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests),
&certificates.CertificateSigningRequest{},
prefix,
csrregistry.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &certificates.CertificateSigningRequest{} },
Expand Down
2 changes: 2 additions & 0 deletions pkg/registry/clusterrole/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// REST implements a RESTStorage for ClusterRole against etcd
Expand All @@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
clusterrole.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
Expand Down
2 changes: 2 additions & 0 deletions pkg/registry/clusterrolebinding/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// REST implements a RESTStorage for ClusterRoleBinding against etcd
Expand All @@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
clusterrolebinding.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
Expand Down
6 changes: 3 additions & 3 deletions pkg/registry/configmap/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/configmap"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"

"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// REST implements a RESTStorage for ConfigMap against etcd
Expand All @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {

newListFunc := func() runtime.Object { return &api.ConfigMapList{} }
storageInterface := opts.Decorator(
opts.Storage, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc)
opts.Storage, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc, storage.NoTriggerPublisher)

store := &registry.Store{
NewFunc: func() runtime.Object {
Expand Down
10 changes: 9 additions & 1 deletion pkg/registry/controller/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// ControllerStorage includes dummy storage for Replication Controllers and for Scale subresource.
Expand Down Expand Up @@ -62,7 +63,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {

newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), &api.ReplicationController{}, prefix, controller.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Controllers),
&api.ReplicationController{},
prefix,
controller.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &api.ReplicationController{} },
Expand Down
10 changes: 9 additions & 1 deletion pkg/registry/daemonset/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// rest implements a RESTStorage for DaemonSets against etcd
Expand All @@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {

newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), &extensions.DaemonSet{}, prefix, daemonset.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets),
&extensions.DaemonSet{},
prefix,
daemonset.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.DaemonSet{} },
Expand Down
9 changes: 8 additions & 1 deletion pkg/registry/deployment/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) {

newListFunc := func() runtime.Object { return &extensions.DeploymentList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), &extensions.Deployment{}, prefix, deployment.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Deployments),
&extensions.Deployment{},
prefix,
deployment.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.Deployment{} },
Expand Down
10 changes: 9 additions & 1 deletion pkg/registry/endpoint/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

type REST struct {
Expand All @@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST {

newListFunc := func() runtime.Object { return &api.EndpointsList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), &api.Endpoints{}, prefix, endpoint.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints),
&api.Endpoints{},
prefix,
endpoint.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Endpoints{} },
Expand Down
34 changes: 28 additions & 6 deletions pkg/registry/generic/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// AttrFunc returns label and field sets for List or Watch to compare against, or an error.
Expand Down Expand Up @@ -50,9 +51,10 @@ func MergeFieldsSets(source fields.Set, fragment fields.Set) fields.Set {
// SelectionPredicate implements a generic predicate that can be passed to
// GenericRegistry's List or Watch methods. Implements the Matcher interface.
type SelectionPredicate struct {
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
IndexFields []string
}

// Matches returns true if the given object's labels and fields (as
Expand All @@ -79,6 +81,20 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) {
return "", false
}

// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
// TODO: Consider supporting also labels.
func (s *SelectionPredicate) MatcherIndex() []storage.MatchValue {
var result []storage.MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, storage.MatchValue{IndexName: field, Value: value})
}
}
return result
}

// Matcher can return true if an object matches the Matcher's selection
// criteria. If it is known that the matcher will match only a single object
// then MatchesSingle should return the key of that object and true. This is an
Expand All @@ -93,9 +109,10 @@ type Matcher interface {
// include the object's namespace.
MatchesSingle() (key string, matchesSingleObject bool)

// TODO: when we start indexing objects, add something like the below:
// MatchesIndices() (indexName []string, indexValue []string)
// where indexName/indexValue are the same length.
// For any known index, if a matcher can match only (a subset) of objects
// that return <value> for a given index, a pair (<index name>, <value>)
// will be returned.
MatcherIndex() []storage.MatchValue
}

// MatcherFunc makes a matcher from the provided function. For easy definition
Expand All @@ -117,6 +134,11 @@ func (m matcherFunc) MatchesSingle() (string, bool) {
return "", false
}

// MatcherIndex always returns empty list.
func (m matcherFunc) MatcherIndex() []storage.MatchValue {
return nil
}

// MatchOnKey returns a matcher that will send only the object matching key
// through the matching function f. For testing!
// Note: use SelectionPredicate above for real code!
Expand Down
16 changes: 9 additions & 7 deletions pkg/registry/generic/registry/storage_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ func StorageWithCacher(
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object) storage.Interface {
newListFunc func() runtime.Object,
triggerFunc storage.TriggerPublisherFunc) storage.Interface {

config := storage.CacherConfig{
CacheCapacity: capacity,
Storage: storageInterface,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
CacheCapacity: capacity,
Storage: storageInterface,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
TriggerPublisherFunc: triggerFunc,
}
if scopeStrategy.NamespaceScoped() {
config.KeyFunc = func(obj runtime.Object) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/generic/registry/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ func (e *Store) createFilter(m generic.Matcher) storage.Filter {
}
return matches
}
return storage.NewSimpleFilter(filterFunc)
return storage.NewSimpleFilter(filterFunc, m.MatcherIndex)
}

// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error
Expand Down
8 changes: 8 additions & 0 deletions pkg/registry/generic/registry/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (sm setMatcher) MatchesSingle() (string, bool) {
return "", false
}

func (sm setMatcher) MatcherIndex() []storage.MatchValue {
return nil
}

// everythingMatcher matches everything
type everythingMatcher struct{}

Expand All @@ -116,6 +120,10 @@ func (everythingMatcher) MatchesSingle() (string, bool) {
return "", false
}

func (everythingMatcher) MatcherIndex() []storage.MatchValue {
return nil
}

func TestStoreList(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "bar"},
Expand Down
6 changes: 4 additions & 2 deletions pkg/registry/generic/storage_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type StorageDecorator func(
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object) storage.Interface
newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) storage.Interface

// Returns given 'storageInterface' without any decoration.
func UndecoratedStorage(
Expand All @@ -39,6 +40,7 @@ func UndecoratedStorage(
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object) storage.Interface {
newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) storage.Interface {
return storageInterface
}
10 changes: 9 additions & 1 deletion pkg/registry/horizontalpodautoscaler/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/horizontalpodautoscaler"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

type REST struct {
Expand All @@ -37,7 +38,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {

newListFunc := func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers), &autoscaling.HorizontalPodAutoscaler{}, prefix, horizontalpodautoscaler.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers),
&autoscaling.HorizontalPodAutoscaler{},
prefix,
horizontalpodautoscaler.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &autoscaling.HorizontalPodAutoscaler{} },
Expand Down
10 changes: 9 additions & 1 deletion pkg/registry/ingress/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
ingress "k8s.io/kubernetes/pkg/registry/ingress"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// rest implements a RESTStorage for replication controllers against etcd
Expand All @@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {

newListFunc := func() runtime.Object { return &extensions.IngressList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Ingress), &extensions.Ingress{}, prefix, ingress.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Ingress),
&extensions.Ingress{},
prefix,
ingress.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)

store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.Ingress{} },
Expand Down
Loading

0 comments on commit 1d9bc58

Please sign in to comment.