diff --git a/changelog/v0.27.3/adding-comments-around-cache-and-server.yaml b/changelog/v0.27.3/adding-comments-around-cache-and-server.yaml new file mode 100644 index 000000000..e6869b72e --- /dev/null +++ b/changelog/v0.27.3/adding-comments-around-cache-and-server.yaml @@ -0,0 +1,3 @@ +changelog: + - type: NON_USER_FACING + description: update comments diff --git a/pkg/api/v1/control-plane/cache/cache.go b/pkg/api/v1/control-plane/cache/cache.go index ce30d51ce..80a6012e1 100644 --- a/pkg/api/v1/control-plane/cache/cache.go +++ b/pkg/api/v1/control-plane/cache/cache.go @@ -32,7 +32,7 @@ type Request = envoy_service_discovery_v3.DiscoveryRequest // // roughly copied from https://github.com/envoyproxy/go-control-plane/blob/dcf5642c8e54496938e0311fe9c48e39b609e583/pkg/cache/v3/cache.go#L45 type ConfigWatcher interface { - // CreateWatch returns a new open watch from a non-empty request. + // CreateWatch returns a new open watch from a non-empty request to receive a response. // // Value channel produces requested resources, once they are available. If // the channel is closed prior to cancellation of the watch, an unrecoverable @@ -60,15 +60,15 @@ type Cache interface { GetStatusKeys() []string } -// Response is a pre-serialized xDS response. +// Response is a pre-serialized xDS response. It contains the updated resources for the original xDS request, the xDS request, and the version. type Response struct { - // Request is the original request. + // Request is the original xDS request. Request envoy_service_discovery_v3.DiscoveryRequest // Version of the resources as tracked by the cache for the given type. // Proxy responds with this version as an acknowledgement. Version string - // Resources to be included in the response. + // Resources to be included in the response. Should match the requested resources in the xDS request. Resources []Resource } diff --git a/pkg/api/v1/control-plane/cache/simple.go b/pkg/api/v1/control-plane/cache/simple.go index 23df1010e..991aec6b8 100644 --- a/pkg/api/v1/control-plane/cache/simple.go +++ b/pkg/api/v1/control-plane/cache/simple.go @@ -131,7 +131,13 @@ func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache } } -// SetSnapshotCache updates a snapshot for a node. +// SetSnapshotCache sets the node's snapshot in the cache. Then submits a response snapshot for any open watches. +// +// Looks for any response watches awaiting for a response and submits a response based off the snapshots resources. +// A response is sent if the version is different than the request version. +// This will clean the watch from the watches. +// +// Each snapshot will only submit resources based off the requests type URL. func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) { cache.mu.Lock() defer cache.mu.Unlock() @@ -217,7 +223,19 @@ func SupersetWithResource(names map[string]Resource, resources map[string]Resour return nil } -// CreateWatch returns a watch for an xDS request. +// CreateWatch returns a response channel for a xDS request and the cancel function. +// A response will be generated by the cache, and sent through the response channel asynchronously. +// +// If the cache snapshot does not exist or the version of the request matches the version of the snapshot, +// create a response watch with and store it in the info watches map so that other processes can send +// information once the snapshot resources exist. IE: SetSnapshot() +// +// Else, if the snapshot exists a response will be submitted to the response channel. +// +// The channel can be used to respond to responses prepared for a request. +// +// Each watch (Response channel) is given an unique watch ID. +// The node from the request is used to get the nodeID from the cache, this is used to get the set of response watches for the node. func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func()) { nodeID := cache.hash.ID(request.Node) @@ -252,8 +270,10 @@ func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func()) request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) } info.mu.Lock() + // check SetSnapshot() for responses on the watches map info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() + // clean the watch from the node watches return value, cache.cancelWatch(nodeID, watchID) } @@ -265,11 +285,13 @@ func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func()) } } +// nextWatchID returns the next watch id func (cache *snapshotCache) nextWatchID() int64 { return atomic.AddInt64(&cache.watchCount, 1) } -// cancellation function for cleaning stale watches +// cancelWatch is used to close the response watch and delete it from the nodes set of watches. +// This cleans the watches. func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { return func() { // uses the cache mutex @@ -286,7 +308,10 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { } } -// Respond to a watch with the snapshot value. The value channel should have capacity not to block. +// respond will create a response and submit it to the value channel. The Response will contain the filtered resources that are requested. +// +// if the cache is ADS and the request contains any resources, the resource list must match all the names in the request resource names. +// Records the request type URL. // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 func (cache *snapshotCache) respond(request Request, value chan Response, resources map[string]Resource, version string) { // for ADS, the request names must match the snapshot names @@ -309,6 +334,7 @@ func (cache *snapshotCache) respond(request Request, value chan Response, resour value <- createResponse(request, resources, version) } +// createResponse creates a Response based off the requested resources. func createResponse(request Request, resources map[string]Resource, version string) Response { filtered := make([]Resource, 0, len(resources)) diff --git a/pkg/api/v1/control-plane/cache/snapshot.go b/pkg/api/v1/control-plane/cache/snapshot.go index 7434494ce..d18203f95 100644 --- a/pkg/api/v1/control-plane/cache/snapshot.go +++ b/pkg/api/v1/control-plane/cache/snapshot.go @@ -40,10 +40,12 @@ func NewResources(version string, items []Resource) Resources { } } +// Snapshot is a snashot of a cached set of resources. type Snapshot interface { Consistent() error // MakeConsistent should never be called on a generic snapshot as it is not used for snapshots with dependent resources. MakeConsistent() + // GetResources will return the resources based off the type. GetResources(typ string) Resources // Clone shouldn't be called on a generic snapshot until https://github.com/solo-io/solo-kit/issues/461 is resolved. Clone() Snapshot diff --git a/pkg/api/v1/control-plane/cache/status.go b/pkg/api/v1/control-plane/cache/status.go index a96c07875..453fb216d 100644 --- a/pkg/api/v1/control-plane/cache/status.go +++ b/pkg/api/v1/control-plane/cache/status.go @@ -40,6 +40,7 @@ type StatusInfo interface { GetLastWatchRequestTime() time.Time } +// statusInfo contains all the response watches for a node. type statusInfo struct { // node is the constant Envoy node metadata. node *envoy_config_core_v3.Node @@ -60,7 +61,7 @@ type ResponseWatch struct { // Request is the original request for the watch. Request Request - // Response is the channel to push response to. + // Response is the channel to push a response to. Response chan Response } diff --git a/pkg/api/v1/control-plane/server/generic_server.go b/pkg/api/v1/control-plane/server/generic_server.go index ed4816918..8d7edb06f 100644 --- a/pkg/api/v1/control-plane/server/generic_server.go +++ b/pkg/api/v1/control-plane/server/generic_server.go @@ -96,41 +96,57 @@ func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks) Ser return &server{ctx: ctx, cache: config, callbacks: callbacks} } +// server sends requests to the cache to be fullfilled. +// This generates responses asynchronously. +// Responses are submitted back to the Envoy client or fetched from the client. type server struct { - cache cache.Cache + // cache is an interface to handle resource cache, and to create channels when the resources are updated. + cache cache.Cache + // callbacks are pre and post callback used when responses and requests sent/received. callbacks Callbacks - ctx context.Context - + // ctx is the context in which the server is alive. + ctx context.Context // streamCount for counting bi-di streams streamCount int64 } +// singleWatch contians a channel that can be used to watch for new responses from the cache. type singleWatch struct { - resource chan cache.Response + // resource is the channel used to receive the response. + resource chan cache.Response + // resourceCancel is a function that allows you to close the resource channel. resourceCancel func() - resourceNonce string + // resourceNonce is the nonce used to identify the response to a request. + resourceNonce string } // watches for all xDS resource types type watches struct { - curerntwatches map[string]singleWatch + // currentwatches are the response channels used for each resource type. + // Currently the only purpose is to cancel the stored channels from the Cancel() function. + currentwatches map[string]singleWatch } +// newWatches returns an instantiation of watches func newWatches() *watches { return &watches{ - curerntwatches: map[string]singleWatch{}, + currentwatches: map[string]singleWatch{}, } } // Cancel all watches func (values *watches) Cancel() { - for _, v := range values.curerntwatches { + for _, v := range values.currentwatches { if v.resourceCancel != nil { v.resourceCancel() } } } +// createResponse will use the response (Envoy Request with updated resources) to serialize the resources and create an Envoy Response. +// +// The response tells Envoy that the current SotW, based off the Envoy Requested resources. +// The response contains its assocaited request. func createResponse(resp *cache.Response, typeURL string) (*envoy_service_discovery_v3.DiscoveryResponse, error) { if resp == nil { return nil, errors.New("missing response") @@ -154,11 +170,19 @@ func createResponse(resp *cache.Response, typeURL string) (*envoy_service_discov return out, nil } +// TypedResponse contains the response from a xDS request with the typeURL type TypedResponse struct { + // Response is the response from a request Response *cache.Response - TypeUrl string + // TypeUrl is the type Url of the xDS request + TypeUrl string } +// StreamEnvoyV3 will create a request channel that will receive requests from the streams Recv() function. +// It will then set up the processes to handle requests when they are received, so that the server can respond to the requests. +// The defaultTypeURL is used to identify the type of the resources that the Envoy stream is watching for. +// +// process is called to handle both the request and the response to the request. It does this by sending the response onto the stream. func (s *server) StreamEnvoyV3( stream StreamEnvoyV3, defaultTypeURL string, @@ -240,6 +264,10 @@ func (s *server) sendSolo( } } +// sendEnvoyV3 returns a function that is called to send an Envoy response. The cahe response is used to create an Envoy response and update the nonce. +// +// It will then send the response to the stream. +// This will handle any callbacks on the server as well. func (s *server) sendEnvoyV3( stream envoy_service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer, ) sendFunc { @@ -259,7 +287,15 @@ func (s *server) sendEnvoyV3( } } -// process handles a bi-di stream request +// process handles both the request and the response of an Envoy request. +// +// For each request submitted onto the request channel, wait for the corresponding response on the response channel. +// +// For each response received from a request submitted, the send function to send the +// response and translates it to an Envoy Response and send it back on the Envoy client. +// +// Requests are received from the servers stream.Recv() function +// Callbacks are handled as well. func (s *server) process( ctx context.Context, send sendFunc, @@ -297,6 +333,7 @@ func (s *server) process( case <-s.ctx.Done(): return nil // config watcher can send the requested resources types in any order + // responses come from requests submitted to createWatch() case resp, more := <-responses: if !more { return status.Errorf(codes.Unavailable, "watching failed") @@ -305,13 +342,14 @@ func (s *server) process( return status.Errorf(codes.Unavailable, "watching failed for "+resp.TypeUrl) } typeurl := resp.TypeUrl + // send the response of a request nonce, err := send(*resp.Response, typeurl, streamID, &streamNonce) if err != nil { return err } - sw := values.curerntwatches[typeurl] + sw := values.currentwatches[typeurl] sw.resourceNonce = nonce - values.curerntwatches[typeurl] = sw + values.currentwatches[typeurl] = sw case req, more := <-reqCh: // input stream ended or errored out @@ -348,20 +386,26 @@ func (s *server) process( // cancel existing watches to (re-)request a newer version typeurl := req.TypeUrl - sw := values.curerntwatches[typeurl] + sw := values.currentwatches[typeurl] if sw.resourceNonce == "" || sw.resourceNonce == nonce { if sw.resourceCancel != nil { sw.resourceCancel() } - sw.resource, sw.resourceCancel = s.createWatch(responses, req) - values.curerntwatches[typeurl] = sw + // wait for a response on the respones channel. Send the request to generate the response asynchronously. + sw.resource, sw.resourceCancel = s.createResponseWatch(responses, req) + values.currentwatches[typeurl] = sw } } } } -func (s *server) createWatch(responses chan<- TypedResponse, req *cache.Request) (chan cache.Response, func()) { +// createResponseWatch returns a channel for the response of a request and the cancel function. +// A request is used to generate an async responce that is submitted to the respones channel. +// +// It creates a go routine to send responses onto the respones channel. +// If the watch created canceled, then it will close the go routine, else there was an error and a nil response is sent. +func (s *server) createResponseWatch(responses chan<- TypedResponse, req *cache.Request) (chan cache.Response, func()) { typeurl := req.TypeUrl watchedResource, cancelwatch := s.cache.CreateWatch(*req) @@ -386,6 +430,7 @@ func (s *server) createWatch(responses chan<- TypedResponse, req *cache.Request) case <-canceled: // this was canceled. goodbye return + // receive responses for the requested resources case response, ok := <-watchedResource: if !ok { // resource chan is closed. this may have happened due to cancel,