Skip to content

Commit df6fa51

Browse files
authored
Merge pull request #69 from harness/FFM-2096
FFM-2096 Add Interface for hooking into SSE Events
2 parents c47b55f + fb0e102 commit df6fa51

File tree

5 files changed

+211
-94
lines changed

5 files changed

+211
-94
lines changed

client/client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (c *CfClient) retrieve(ctx context.Context) bool {
163163
return ok
164164
}
165165

166-
func (c *CfClient) streamConnect() {
166+
func (c *CfClient) streamConnect(ctx context.Context) {
167167
// we only ever want one stream to be setup - other threads must wait before trying to establish a connection
168168
c.streamConnectedLock.Lock()
169169
defer c.streamConnectedLock.Unlock()
@@ -184,12 +184,12 @@ func (c *CfClient) streamConnect() {
184184
defer c.mux.RUnlock()
185185
c.streamConnected = false
186186
}
187-
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.config.Cache, c.api, c.config.Logger, streamErr)
187+
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.config.Cache, c.api, c.config.Logger, streamErr, c.config.eventStreamListener)
188188

189189
// Connect kicks off a goroutine that attempts to establish a stream connection
190190
// while this is happening we set streamConnected to true - if any errors happen
191191
// in this process streamConnected will be set back to false by the streamErr function
192-
conn.Connect(c.environmentID)
192+
conn.Connect(ctx, c.environmentID, c.sdkKey)
193193
c.streamConnected = true
194194
}
195195

@@ -307,7 +307,7 @@ func (c *CfClient) pullCronJob(ctx context.Context) {
307307
if ok && c.config.enableStream {
308308
// here stream is enabled but not connected, so we attempt to reconnect
309309
c.config.Logger.Info("Attempting to start stream")
310-
c.streamConnect()
310+
c.streamConnect(ctx)
311311
}
312312
}
313313
c.mux.RUnlock()

client/config.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net/http"
66

77
"github.com/harness/ff-golang-server-sdk/evaluation"
8+
"github.com/harness/ff-golang-server-sdk/stream"
89

910
"github.com/harness/ff-golang-server-sdk/cache"
1011
"github.com/harness/ff-golang-server-sdk/logger"
@@ -13,16 +14,17 @@ import (
1314
)
1415

1516
type config struct {
16-
url string
17-
eventsURL string
18-
pullInterval uint // in minutes
19-
Cache cache.Cache
20-
Store storage.Storage
21-
Logger logger.Logger
22-
httpClient *http.Client
23-
enableStream bool
24-
enableStore bool
25-
target evaluation.Target
17+
url string
18+
eventsURL string
19+
pullInterval uint // in minutes
20+
Cache cache.Cache
21+
Store storage.Storage
22+
Logger logger.Logger
23+
httpClient *http.Client
24+
enableStream bool
25+
enableStore bool
26+
target evaluation.Target
27+
eventStreamListener stream.EventStreamListener
2628
}
2729

2830
func newDefaultConfig() *config {

client/options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/harness/ff-golang-server-sdk/evaluation"
88
"github.com/harness/ff-golang-server-sdk/logger"
99
"github.com/harness/ff-golang-server-sdk/storage"
10+
"github.com/harness/ff-golang-server-sdk/stream"
1011
)
1112

1213
// ConfigOption is used as return value for advanced client configuration
@@ -88,3 +89,11 @@ func WithTarget(target evaluation.Target) ConfigOption {
8889
config.target = target
8990
}
9091
}
92+
93+
// WithEventStreamListener configures the SDK to forward Events from the Feature
94+
// Flag server to the passed EventStreamListener
95+
func WithEventStreamListener(e stream.EventStreamListener) ConfigOption {
96+
return func(config *config) {
97+
config.eventStreamListener = e
98+
}
99+
}

stream/sse.go

Lines changed: 162 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ import (
1717

1818
// SSEClient is Server Send Event object
1919
type SSEClient struct {
20-
api rest.ClientWithResponsesInterface
21-
client *sse.Client
22-
cache cache.Cache
23-
logger logger.Logger
24-
onStreamError func()
20+
api rest.ClientWithResponsesInterface
21+
client *sse.Client
22+
cache cache.Cache
23+
logger logger.Logger
24+
onStreamError func()
25+
eventStreamListener EventStreamListener
2526
}
2627

2728
var json = jsoniter.ConfigCompatibleWithStandardLibrary
@@ -35,107 +36,188 @@ func NewSSEClient(
3536
api rest.ClientWithResponsesInterface,
3637
logger logger.Logger,
3738
onStreamError func(),
39+
eventStreamListener EventStreamListener,
3840
) *SSEClient {
3941
client.Headers["Authorization"] = fmt.Sprintf("Bearer %s", token)
4042
client.Headers["API-Key"] = apiKey
4143
client.OnDisconnect(func(client *sse.Client) {
4244
onStreamError()
4345
})
4446
sseClient := &SSEClient{
45-
client: client,
46-
cache: cache,
47-
api: api,
48-
logger: logger,
49-
onStreamError: onStreamError,
47+
client: client,
48+
cache: cache,
49+
api: api,
50+
logger: logger,
51+
onStreamError: onStreamError,
52+
eventStreamListener: eventStreamListener,
5053
}
5154
return sseClient
5255
}
5356

5457
// Connect will subscribe to SSE stream
55-
func (c *SSEClient) Connect(environment string) {
58+
func (c *SSEClient) Connect(ctx context.Context, environment string, apiKey string) {
59+
go func() {
60+
for event := range orDone(ctx, c.subscribe(ctx, environment, apiKey)) {
61+
c.handleEvent(event)
62+
}
63+
}()
64+
}
65+
66+
// Connect will subscribe to SSE stream
67+
func (c *SSEClient) subscribe(ctx context.Context, environment string, apiKey string) <-chan Event {
5668
c.logger.Infof("Start subscribing to Stream")
5769
// don't use the default exponentialBackoff strategy - we have our own disconnect logic
5870
// of polling the service then re-establishing a new stream once we can connect
5971
c.client.ReconnectStrategy = &backoff.StopBackOff{}
6072
// it is blocking operation, it needs to go in go routine
73+
74+
out := make(chan Event)
6175
go func() {
62-
err := c.client.Subscribe("*", func(msg *sse.Event) {
76+
defer close(out)
77+
78+
err := c.client.SubscribeWithContext(ctx, "*", func(msg *sse.Event) {
6379
c.logger.Infof("Event received: %s", msg.Data)
6480

65-
cfMsg := Message{}
66-
if len(msg.Data) > 0 {
67-
err := json.Unmarshal(msg.Data, &cfMsg)
68-
if err != nil {
69-
c.logger.Errorf("%s", err.Error())
70-
return
71-
}
81+
if len(msg.Data) <= 0 {
82+
return
83+
}
7284

73-
switch cfMsg.Domain {
74-
case dto.KeyFeature:
75-
// maybe is better to send event on memory bus that we get new message
76-
// and subscribe to that event
77-
switch cfMsg.Event {
78-
case dto.SseDeleteEvent:
79-
go func(identifier string) {
80-
c.cache.Remove(dto.Key{
81-
Type: dto.KeyFeature,
82-
Name: identifier,
83-
})
84-
}(cfMsg.Identifier)
85-
case dto.SsePatchEvent, dto.SseCreateEvent:
86-
fallthrough
87-
default:
88-
go func(env, identifier string) {
89-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
90-
defer cancel()
91-
response, err := c.api.GetFeatureConfigByIdentifierWithResponse(ctx, env, identifier)
92-
if err != nil {
93-
c.logger.Errorf("error while pulling flag, err: %s", err.Error())
94-
return
95-
}
96-
if response.JSON200 != nil {
97-
c.cache.Set(dto.Key{
98-
Type: dto.KeyFeature,
99-
Name: identifier,
100-
}, *response.JSON200.Convert())
101-
}
102-
}(environment, cfMsg.Identifier)
103-
}
104-
case dto.KeySegment:
105-
// need open client spec change
106-
switch cfMsg.Event {
107-
case dto.SseDeleteEvent:
108-
go func(identifier string) {
109-
c.cache.Remove(dto.Key{
110-
Type: dto.KeySegment,
111-
Name: identifier,
112-
})
113-
}(cfMsg.Identifier)
114-
case dto.SsePatchEvent, dto.SseCreateEvent:
115-
fallthrough
116-
default:
117-
go func(env, identifier string) {
118-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
119-
defer cancel()
120-
response, err := c.api.GetSegmentByIdentifierWithResponse(ctx, env, identifier)
121-
if err != nil {
122-
c.logger.Errorf("error while pulling segment, err: %s", err.Error())
123-
return
124-
}
125-
if response.JSON200 != nil {
126-
c.cache.Set(dto.Key{
127-
Type: dto.KeySegment,
128-
Name: identifier,
129-
}, response.JSON200.Convert())
130-
}
131-
}(environment, cfMsg.Identifier)
132-
}
133-
}
85+
event := Event{
86+
APIKey: apiKey,
87+
Environment: environment,
88+
SSEEvent: msg,
89+
}
90+
91+
select {
92+
case <-ctx.Done():
93+
return
94+
case out <- event:
13495
}
96+
13597
})
13698
if err != nil {
13799
c.logger.Errorf("Error initializing stream: %s", err.Error())
138100
c.onStreamError()
139101
}
140102
}()
103+
104+
return out
105+
}
106+
107+
func (c *SSEClient) handleEvent(event Event) {
108+
cfMsg := Message{}
109+
err := json.Unmarshal(event.SSEEvent.Data, &cfMsg)
110+
if err != nil {
111+
c.logger.Errorf("%s", err.Error())
112+
return
113+
}
114+
115+
switch cfMsg.Domain {
116+
case dto.KeyFeature:
117+
// maybe is better to send event on memory bus that we get new message
118+
// and subscribe to that event
119+
switch cfMsg.Event {
120+
case dto.SseDeleteEvent:
121+
122+
c.cache.Remove(dto.Key{
123+
Type: dto.KeyFeature,
124+
Name: cfMsg.Identifier,
125+
})
126+
127+
case dto.SsePatchEvent, dto.SseCreateEvent:
128+
fallthrough
129+
default:
130+
updateWithTimeout := func() {
131+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
132+
defer cancel()
133+
134+
response, err := c.api.GetFeatureConfigByIdentifierWithResponse(ctx, event.Environment, cfMsg.Identifier)
135+
if err != nil {
136+
c.logger.Errorf("error while pulling flag, err: %s", err.Error())
137+
return
138+
}
139+
140+
if response.JSON200 != nil {
141+
c.cache.Set(dto.Key{
142+
Type: dto.KeyFeature,
143+
Name: cfMsg.Identifier,
144+
}, *response.JSON200.Convert())
145+
}
146+
}
147+
148+
updateWithTimeout()
149+
}
150+
151+
case dto.KeySegment:
152+
// need open client spec change
153+
switch cfMsg.Event {
154+
case dto.SseDeleteEvent:
155+
156+
c.cache.Remove(dto.Key{
157+
Type: dto.KeySegment,
158+
Name: cfMsg.Identifier,
159+
})
160+
161+
case dto.SsePatchEvent, dto.SseCreateEvent:
162+
fallthrough
163+
default:
164+
updateWithTimeout := func() {
165+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
166+
defer cancel()
167+
168+
response, err := c.api.GetSegmentByIdentifierWithResponse(ctx, event.Environment, cfMsg.Identifier)
169+
if err != nil {
170+
c.logger.Errorf("error while pulling segment, err: %s", err.Error())
171+
return
172+
}
173+
if response.JSON200 != nil {
174+
c.cache.Set(dto.Key{
175+
Type: dto.KeySegment,
176+
Name: cfMsg.Identifier,
177+
}, response.JSON200.Convert())
178+
}
179+
}
180+
updateWithTimeout()
181+
}
182+
}
183+
184+
if c.eventStreamListener != nil {
185+
sendWithTimeout := func() error {
186+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
187+
defer cancel()
188+
return c.eventStreamListener.Pub(ctx, Event{APIKey: event.APIKey, Environment: event.Environment, SSEEvent: event.SSEEvent})
189+
}
190+
191+
if err := sendWithTimeout(); err != nil {
192+
c.logger.Errorf("error while forwarding SSE Event to change stream: %s", err)
193+
}
194+
}
195+
}
196+
197+
// orDone is a helper that encapsulates the logic for reading from a channel
198+
// whilst waiting for a cancellation.
199+
func orDone(ctx context.Context, c <-chan Event) <-chan Event {
200+
out := make(chan Event)
201+
202+
go func() {
203+
defer close(out)
204+
205+
for {
206+
select {
207+
case <-ctx.Done():
208+
return
209+
case cp, ok := <-c:
210+
if !ok {
211+
return
212+
}
213+
214+
select {
215+
case <-ctx.Done():
216+
case out <- cp:
217+
}
218+
}
219+
}
220+
}()
221+
222+
return out
141223
}

0 commit comments

Comments
 (0)