Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch config #3

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 107 additions & 15 deletions apollo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,126 @@ package apollo

import (
"context"
"crypto/tls"

"github.com/libgox/addr"
"golang.org/x/exp/slog"
"encoding/json"
"fmt"
"net/http"
)

type Client interface {
}
GetStringValue(namespace, key string) string

type innerClient struct {
ctx context.Context
SubscribeEvent(listener Listener)
}

type Config struct {
Address addr.Address
// TlsConfig configuration information for tls.
TlsConfig *tls.Config
// Logger structured logger for logging operations
Logger *slog.Logger
type innerClient struct {
ctx context.Context
config *Config
storage *storage
poller *longPoll
listener Listener
}

func NewClient(config *Config) (Client, error) {
if config.Logger != nil {
config.Logger = slog.Default()
SetLogger(config.Logger)
}
ctx := context.Background()
c := &innerClient{
ctx: ctx,
ctx: ctx,
config: config,
storage: newStorage(config.NamespaceNames),
}
c.poller = newLongPoll(config, c.updateHandle)

// sync
err := c.poller.fetch(c.ctx)
if err != nil {
return nil, err
}

// long poll
go c.poller.start(c.ctx)

return c, nil
}

func (i *innerClient) updateHandle(notification *notification) error {
change, err := i.sync(notification)
if err != nil {
return err
}
if change == nil || len(change.Changes) == 0 {
return fmt.Errorf("no changes to sync")
}
if i.listener != nil {
i.listener.OnChange(change)
}
return nil
}

func (i *innerClient) sync(notification *notification) (*ChangeEvent, error) {
log.Infof("sync namespace %s with remote config server", notification.NamespaceName)
url := i.config.GetSyncURI(notification.NamespaceName)
r := &requester{
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: i.config.TLSConfig,
},
},
retries: 3,
}
result, err := r.do(i.ctx, url, r.retries)
if err != nil || len(result) == 0 {
return nil, err
}

ac := &apolloConfiguration{}
if err = json.Unmarshal(result, ac); err != nil {
return nil, err
}
return i.updateCache(ac)
}

func (i *innerClient) updateCache(ac *apolloConfiguration) (*ChangeEvent, error) {
var change = &ChangeEvent{
Namespace: ac.NamespaceName,
Changes: make(map[string]*Change),
}
c := i.storage.loadCache(ac.NamespaceName)

c.data.Range(func(k, v interface{}) bool {
key := k.(string)
value := v.(string)
if _, ok := ac.Configurations[key]; !ok {
c.data.Delete(key)
change.Changes[key] = onDelete(key, value)
}
return true
})

for k, v := range ac.Configurations {
old, ok := c.data.Load(k)
if !ok {
change.Changes[k] = onAdd(k, v)
c.data.Store(k, v)
continue
}
if old.(string) != v {
change.Changes[k] = onModify(k, old.(string), v)
}
c.data.Store(k, v)
}
return change, nil
}

func (i *innerClient) SubscribeEvent(listener Listener) {
i.listener = listener
}

func (i *innerClient) GetStringValue(namespace string, key string) string {
v, ok := i.storage.loadCache(namespace).data.Load(key)
if !ok {
return ""
}
return v.(string)
}
35 changes: 35 additions & 0 deletions apollo/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package apollo

import (
"testing"
"time"

"github.com/libgox/addr"
)

func TestNewClient(t *testing.T) {
c, err := NewClient(&Config{
AppID: "SampleApp",
Cluster: "default",
NamespaceNames: []string{"application", "application2"},
Address: addr.Address{
Host: "localhost",
Port: 8080,
},
Secret: "",
TLSConfig: nil,
Logger: nil,
})
if err == nil {
value := c.GetStringValue("application", "timeout")
value2 := c.GetStringValue("application2", "timeout")
c.SubscribeEvent(&ClientTest{})
t.Log(value, ",", value2)
}
time.Sleep(100 * time.Second)
}

type ClientTest struct{}

func (c *ClientTest) OnChange(event *ChangeEvent) {
}
47 changes: 47 additions & 0 deletions apollo/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package apollo

import (
"crypto/tls"
"fmt"
"net/url"

"github.com/libgox/addr"
)

type Config struct {
AppID string
Cluster string
NamespaceNames []string
Address addr.Address
Secret string
// TlsConfig configuration information for tls.
TLSConfig *tls.Config
Logger Logger
}

func (c *Config) GetNotifyURLSuffix(notifications string) string {
return fmt.Sprintf("%s/notifications/v2?appId=%s&cluster=%s&notifications=%s",
c.GetUrlPrefix(),
url.QueryEscape(c.AppID),
url.QueryEscape(c.Cluster),
url.QueryEscape(notifications))
}

func (c *Config) GetSyncURI(namespace string) string {
return fmt.Sprintf("%s/configs/%s/%s/%s?releaseKey=&ip=%s",
c.GetUrlPrefix(),
url.QueryEscape(c.AppID),
url.QueryEscape(c.Cluster),
url.QueryEscape(namespace),
GetLocalIP())
}

func (c *Config) GetUrlPrefix() string {
var urlPrefix string
if c.TLSConfig != nil {
urlPrefix = "https://" + c.Address.Addr()
} else {
urlPrefix = "http://" + c.Address.Addr()
}
return urlPrefix
}
53 changes: 53 additions & 0 deletions apollo/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package apollo

type ChangeType int

const (
ADD ChangeType = iota

MODIFY

DELETE
)

type Listener interface {
OnChange(event *ChangeEvent)
}

type Change struct {
Key string
OldValue string
NewValue string
ChangeType ChangeType
}

type ChangeEvent struct {
Namespace string
NotificationID int
Changes map[string]*Change
}

func onDelete(key, value string) *Change {
return &Change{
Key: key,
ChangeType: DELETE,
OldValue: value,
}
}

func onModify(key, oldValue, newValue string) *Change {
return &Change{
Key: key,
ChangeType: MODIFY,
OldValue: oldValue,
NewValue: newValue,
}
}

func onAdd(key, value string) *Change {
return &Change{
Key: key,
ChangeType: ADD,
NewValue: value,
}
}
57 changes: 57 additions & 0 deletions apollo/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package apollo

import (
"fmt"

"golang.org/x/exp/slog"
)

type Logger interface {
Info(format string, args ...interface{})

Error(format string, args ...interface{})

Warn(format string, args ...interface{})

Infof(format string, args ...interface{})

Errorf(format string, args ...interface{})

Warnf(format string, args ...interface{})
}

type defaultLogger struct {
Logger *slog.Logger
}

var log Logger = &defaultLogger{
Logger: slog.Default(),
}

func SetLogger(logger Logger) {
log = logger
}

func (d *defaultLogger) Info(format string, args ...interface{}) {
d.Logger.Info(format, args...)
}

func (d *defaultLogger) Error(format string, args ...interface{}) {
d.Logger.Error(format, args...)
}

func (d *defaultLogger) Warn(format string, args ...interface{}) {
d.Logger.Warn(format, args...)
}

func (d *defaultLogger) Infof(format string, args ...interface{}) {
d.Logger.Info(fmt.Sprintf(format, args...))
}

func (d *defaultLogger) Errorf(format string, args ...interface{}) {
d.Logger.Error(fmt.Sprintf(format, args...))
}

func (d *defaultLogger) Warnf(format string, args ...interface{}) {
d.Logger.Warn(fmt.Sprintf(format, args...))
}
49 changes: 49 additions & 0 deletions apollo/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package apollo

import (
"encoding/json"
"sync"
)

const defaultNotificationID int = -1

type notificationsMgr struct {
notifications sync.Map
}

type notification struct {
NamespaceName string `json:"namespaceName"`
NotificationID int `json:"notificationId"`
}

func newNotificationManager(namespaceNames []string) *notificationsMgr {
n := &notificationsMgr{
notifications: sync.Map{},
}
for _, namespaceName := range namespaceNames {
n.notifications.Store(namespaceName, defaultNotificationID)
}
return n
}

func (n *notificationsMgr) String() string {
var notifications []*notification
n.notifications.Range(func(key, value interface{}) bool {
k, _ := key.(string)
v, _ := value.(int)
notifications = append(notifications, &notification{
NamespaceName: k,
NotificationID: v,
})
return true
})
res, err := json.Marshal(&notifications)
if err != nil {
return ""
}
return string(res)
}

func (n *notificationsMgr) Store(namespaceName string, notificationID int) {
n.notifications.Store(namespaceName, notificationID)
}
Loading
Loading