Skip to content

Commit bfa642a

Browse files
authored
feat: Support supplying a custom sync provider for in-process flagd (#598)
Signed-off-by: Maks Osowski <[email protected]>
1 parent 629a082 commit bfa642a

7 files changed

+278
-95
lines changed

providers/flagd/README.md

+29-2
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,36 @@ openfeature.SetProvider(provider)
5757
The provider will attempt to detect file changes, but this is a best-effort attempt as file system events differ between operating systems.
5858
This mode is useful for local development, tests and offline applications.
5959

60+
#### Custom sync provider
61+
62+
In-process resolver can also be configured with a custom sync provider to change how the in-process resolver fetches flags.
63+
The custom sync provider must implement the [sync.ISync interface](https://github.com/open-feature/flagd/blob/main/core/pkg/sync/isync.go). Optional URI can be provided for the custom sync provider.
64+
65+
```go
66+
var syncProvider sync.ISync = MyAwesomeSyncProvider{}
67+
68+
provider := flagd.NewProvider(
69+
flagd.WithInProcessResolver(),
70+
flagd.WithCustomSyncProvider(syncProvider))
71+
openfeature.SetProvider(provider)
72+
```
73+
74+
```go
75+
var syncProvider sync.ISync = MyAwesomeSyncProvider{}
76+
var syncProviderUri string = "myawesome://sync.uri"
77+
78+
provider := flagd.NewProvider(
79+
flagd.WithInProcessResolver(),
80+
flagd.WithCustomSyncProviderAndUri(syncProvider, syncProviderUri))
81+
openfeature.SetProvider(provider)
82+
```
83+
6084
> [!IMPORTANT]
61-
> Note that you can only use a single flag source (either gRPC or offline file) for the in-process resolver.
62-
> If both sources are configured, offline mode will be selected.
85+
> Note that the in-process resolver can only use a single flag source.
86+
> If multiple sources are configured then only one would be selected based on the following order of preference:
87+
> 1. Custom sync provider
88+
> 2. Offline file
89+
> 3. gRPC
6390
6491
## Configuration options
6592

providers/flagd/pkg/configuration.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package flagd
22

33
import (
44
"fmt"
5-
"github.com/go-logr/logr"
6-
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
75
"os"
86
"strconv"
7+
8+
"github.com/go-logr/logr"
9+
"github.com/open-feature/flagd/core/pkg/sync"
10+
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
911
)
1012

1113
type ResolverType string
@@ -52,6 +54,8 @@ type providerConfiguration struct {
5254
Selector string
5355
SocketPath string
5456
TLSEnabled bool
57+
CustomSyncProvider sync.ISync
58+
CustomSyncProviderUri string
5559

5660
log logr.Logger
5761
}

providers/flagd/pkg/provider.go

+33-9
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,20 @@ package flagd
33
import (
44
"context"
55
"fmt"
6+
7+
parallel "sync"
8+
69
"github.com/go-logr/logr"
10+
"github.com/open-feature/flagd/core/pkg/sync"
711
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
812
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger"
9-
"github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
13+
process "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
1014
rpcService "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/rpc"
1115
of "github.com/open-feature/go-sdk/openfeature"
12-
"sync"
16+
)
17+
18+
const (
19+
defaultCustomSyncProviderUri = "syncprovider://custom"
1320
)
1421

1522
type Provider struct {
@@ -18,7 +25,7 @@ type Provider struct {
1825
providerConfiguration *providerConfiguration
1926
service IService
2027
status of.State
21-
mtx sync.RWMutex
28+
mtx parallel.RWMutex
2229

2330
eventStream chan of.Event
2431
}
@@ -71,12 +78,14 @@ func NewProvider(opts ...ProviderOption) *Provider {
7178
provider.providerConfiguration.EventStreamConnectionMaxAttempts)
7279
} else {
7380
service = process.NewInProcessService(process.Configuration{
74-
Host: provider.providerConfiguration.Host,
75-
Port: provider.providerConfiguration.Port,
76-
Selector: provider.providerConfiguration.Selector,
77-
TargetUri: provider.providerConfiguration.TargetUri,
78-
TLSEnabled: provider.providerConfiguration.TLSEnabled,
79-
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
81+
Host: provider.providerConfiguration.Host,
82+
Port: provider.providerConfiguration.Port,
83+
Selector: provider.providerConfiguration.Selector,
84+
TargetUri: provider.providerConfiguration.TargetUri,
85+
TLSEnabled: provider.providerConfiguration.TLSEnabled,
86+
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
87+
CustomSyncProvider: provider.providerConfiguration.CustomSyncProvider,
88+
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
8089
})
8190
}
8291

@@ -324,3 +333,18 @@ func FromEnv() ProviderOption {
324333
p.providerConfiguration.updateFromEnvVar()
325334
}
326335
}
336+
337+
// WithCustomSyncProvider provides a custom implementation of the sync.ISync interface used by the inProcess Service
338+
// This is only useful with inProcess resolver type
339+
func WithCustomSyncProvider(customSyncProvider sync.ISync) ProviderOption {
340+
return WithCustomSyncProviderAndUri(customSyncProvider, defaultCustomSyncProviderUri)
341+
}
342+
343+
// WithCustomSyncProvider provides a custom implementation of the sync.ISync interface used by the inProcess Service
344+
// This is only useful with inProcess resolver type
345+
func WithCustomSyncProviderAndUri(customSyncProvider sync.ISync, customSyncProviderUri string) ProviderOption {
346+
return func(p *Provider) {
347+
p.providerConfiguration.CustomSyncProvider = customSyncProvider
348+
p.providerConfiguration.CustomSyncProviderUri = customSyncProviderUri
349+
}
350+
}

providers/flagd/pkg/provider_test.go

+141-74
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,67 @@
11
package flagd
22

33
import (
4+
"testing"
5+
6+
"github.com/open-feature/flagd/core/pkg/sync"
47
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
58
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/mock"
9+
process "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
610
of "github.com/open-feature/go-sdk/openfeature"
711
"go.uber.org/mock/gomock"
8-
"testing"
912
)
1013

1114
func TestNewProvider(t *testing.T) {
15+
customSyncProvider := process.NewDoNothingCustomSyncProvider()
16+
1217
tests := []struct {
13-
name string
14-
expectedResolver ResolverType
15-
expectPort uint16
16-
expectHost string
17-
expectTargetUri string
18-
expectCacheType cache.Type
19-
expectCertPath string
20-
expectMaxRetries int
21-
expectCacheSize int
22-
expectOtelIntercept bool
23-
expectSocketPath string
24-
expectTlsEnabled bool
25-
options []ProviderOption
18+
name string
19+
expectedResolver ResolverType
20+
expectPort uint16
21+
expectHost string
22+
expectTargetUri string
23+
expectCacheType cache.Type
24+
expectCertPath string
25+
expectMaxRetries int
26+
expectCacheSize int
27+
expectOtelIntercept bool
28+
expectSocketPath string
29+
expectTlsEnabled bool
30+
expectCustomSyncProvider sync.ISync
31+
expectCustomSyncProviderUri string
32+
options []ProviderOption
2633
}{
2734
{
28-
name: "default construction",
29-
expectedResolver: rpc,
30-
expectPort: defaultRpcPort,
31-
expectHost: defaultHost,
32-
expectTargetUri: "",
33-
expectCacheType: defaultCache,
34-
expectCertPath: "",
35-
expectMaxRetries: defaultMaxEventStreamRetries,
36-
expectCacheSize: defaultMaxCacheSize,
37-
expectOtelIntercept: false,
38-
expectSocketPath: "",
39-
expectTlsEnabled: false,
35+
name: "default construction",
36+
expectedResolver: rpc,
37+
expectPort: defaultRpcPort,
38+
expectHost: defaultHost,
39+
expectTargetUri: "",
40+
expectCacheType: defaultCache,
41+
expectCertPath: "",
42+
expectMaxRetries: defaultMaxEventStreamRetries,
43+
expectCacheSize: defaultMaxCacheSize,
44+
expectOtelIntercept: false,
45+
expectSocketPath: "",
46+
expectTlsEnabled: false,
47+
expectCustomSyncProvider: nil,
48+
expectCustomSyncProviderUri: "",
4049
},
4150
{
42-
name: "with options",
43-
expectedResolver: inProcess,
44-
expectPort: 9090,
45-
expectHost: "myHost",
46-
expectTargetUri: "",
47-
expectCacheType: cache.LRUValue,
48-
expectCertPath: "/path",
49-
expectMaxRetries: 2,
50-
expectCacheSize: 2500,
51-
expectOtelIntercept: true,
52-
expectSocketPath: "/socket",
53-
expectTlsEnabled: true,
51+
name: "with options",
52+
expectedResolver: inProcess,
53+
expectPort: 9090,
54+
expectHost: "myHost",
55+
expectTargetUri: "",
56+
expectCacheType: cache.LRUValue,
57+
expectCertPath: "/path",
58+
expectMaxRetries: 2,
59+
expectCacheSize: 2500,
60+
expectOtelIntercept: true,
61+
expectSocketPath: "/socket",
62+
expectTlsEnabled: true,
63+
expectCustomSyncProvider: nil,
64+
expectCustomSyncProviderUri: "",
5465
options: []ProviderOption{
5566
WithInProcessResolver(),
5667
WithSocketPath("/socket"),
@@ -63,57 +74,103 @@ func TestNewProvider(t *testing.T) {
6374
},
6475
},
6576
{
66-
name: "default port handling with in-process resolver",
67-
expectedResolver: inProcess,
68-
expectPort: defaultInProcessPort,
69-
expectHost: defaultHost,
70-
expectCacheType: defaultCache,
71-
expectTargetUri: "",
72-
expectCertPath: "",
73-
expectMaxRetries: defaultMaxEventStreamRetries,
74-
expectCacheSize: defaultMaxCacheSize,
75-
expectOtelIntercept: false,
76-
expectSocketPath: "",
77-
expectTlsEnabled: false,
77+
name: "default port handling with in-process resolver",
78+
expectedResolver: inProcess,
79+
expectPort: defaultInProcessPort,
80+
expectHost: defaultHost,
81+
expectCacheType: defaultCache,
82+
expectTargetUri: "",
83+
expectCertPath: "",
84+
expectMaxRetries: defaultMaxEventStreamRetries,
85+
expectCacheSize: defaultMaxCacheSize,
86+
expectOtelIntercept: false,
87+
expectSocketPath: "",
88+
expectTlsEnabled: false,
89+
expectCustomSyncProvider: nil,
90+
expectCustomSyncProviderUri: "",
7891
options: []ProviderOption{
7992
WithInProcessResolver(),
8093
},
8194
},
8295
{
83-
name: "default port handling with in-process resolver",
84-
expectedResolver: rpc,
85-
expectPort: defaultRpcPort,
86-
expectHost: defaultHost,
87-
expectTargetUri: "",
88-
expectCacheType: defaultCache,
89-
expectCertPath: "",
90-
expectMaxRetries: defaultMaxEventStreamRetries,
91-
expectCacheSize: defaultMaxCacheSize,
92-
expectOtelIntercept: false,
93-
expectSocketPath: "",
94-
expectTlsEnabled: false,
96+
name: "default port handling with in-process resolver",
97+
expectedResolver: rpc,
98+
expectPort: defaultRpcPort,
99+
expectHost: defaultHost,
100+
expectTargetUri: "",
101+
expectCacheType: defaultCache,
102+
expectCertPath: "",
103+
expectMaxRetries: defaultMaxEventStreamRetries,
104+
expectCacheSize: defaultMaxCacheSize,
105+
expectOtelIntercept: false,
106+
expectSocketPath: "",
107+
expectTlsEnabled: false,
108+
expectCustomSyncProvider: nil,
109+
expectCustomSyncProviderUri: "",
95110
options: []ProviderOption{
96111
WithRPCResolver(),
97112
},
98113
},
99114
{
100-
name: "with target uri with in-process resolver",
101-
expectedResolver: inProcess,
102-
expectPort: defaultInProcessPort,
103-
expectHost: defaultHost,
104-
expectCacheType: defaultCache,
105-
expectTargetUri: "envoy://localhost:9211/test.service",
106-
expectCertPath: "",
107-
expectMaxRetries: defaultMaxEventStreamRetries,
108-
expectCacheSize: defaultMaxCacheSize,
109-
expectOtelIntercept: false,
110-
expectSocketPath: "",
111-
expectTlsEnabled: false,
115+
name: "with target uri with in-process resolver",
116+
expectedResolver: inProcess,
117+
expectPort: defaultInProcessPort,
118+
expectHost: defaultHost,
119+
expectCacheType: defaultCache,
120+
expectTargetUri: "envoy://localhost:9211/test.service",
121+
expectCertPath: "",
122+
expectMaxRetries: defaultMaxEventStreamRetries,
123+
expectCacheSize: defaultMaxCacheSize,
124+
expectOtelIntercept: false,
125+
expectSocketPath: "",
126+
expectTlsEnabled: false,
127+
expectCustomSyncProvider: nil,
128+
expectCustomSyncProviderUri: "",
112129
options: []ProviderOption{
113130
WithInProcessResolver(),
114131
WithTargetUri("envoy://localhost:9211/test.service"),
115132
},
116133
},
134+
{
135+
name: "with custom sync provider and uri with in-process resolver",
136+
expectedResolver: inProcess,
137+
expectPort: defaultInProcessPort,
138+
expectHost: defaultHost,
139+
expectCacheType: defaultCache,
140+
expectTargetUri: "",
141+
expectCertPath: "",
142+
expectMaxRetries: defaultMaxEventStreamRetries,
143+
expectCacheSize: defaultMaxCacheSize,
144+
expectOtelIntercept: false,
145+
expectSocketPath: "",
146+
expectTlsEnabled: false,
147+
expectCustomSyncProvider: customSyncProvider,
148+
expectCustomSyncProviderUri: "testsyncer://custom.uri",
149+
options: []ProviderOption{
150+
WithInProcessResolver(),
151+
WithCustomSyncProviderAndUri(customSyncProvider, "testsyncer://custom.uri"),
152+
},
153+
},
154+
{
155+
name: "with custom sync provider with in-process resolver",
156+
expectedResolver: inProcess,
157+
expectPort: defaultInProcessPort,
158+
expectHost: defaultHost,
159+
expectCacheType: defaultCache,
160+
expectTargetUri: "",
161+
expectCertPath: "",
162+
expectMaxRetries: defaultMaxEventStreamRetries,
163+
expectCacheSize: defaultMaxCacheSize,
164+
expectOtelIntercept: false,
165+
expectSocketPath: "",
166+
expectTlsEnabled: false,
167+
expectCustomSyncProvider: customSyncProvider,
168+
expectCustomSyncProviderUri: defaultCustomSyncProviderUri,
169+
options: []ProviderOption{
170+
WithInProcessResolver(),
171+
WithCustomSyncProvider(customSyncProvider),
172+
},
173+
},
117174
}
118175

119176
for _, test := range tests {
@@ -172,6 +229,16 @@ func TestNewProvider(t *testing.T) {
172229
test.expectTargetUri, config.TargetUri)
173230
}
174231

232+
if config.CustomSyncProvider != test.expectCustomSyncProvider {
233+
t.Errorf("incorrect configuration CustomSyncProvider, expected %v, got %v",
234+
test.expectCustomSyncProvider, config.CustomSyncProvider)
235+
}
236+
237+
if config.CustomSyncProviderUri != test.expectCustomSyncProviderUri {
238+
t.Errorf("incorrect configuration CustomSyncProviderUri, expected %v, got %v",
239+
test.expectCustomSyncProviderUri, config.CustomSyncProviderUri)
240+
}
241+
175242
// this line will fail linting if this provider is no longer compatible with the openfeature sdk
176243
var _ of.FeatureProvider = flagdProvider
177244
})

0 commit comments

Comments
 (0)