Skip to content

Commit

Permalink
sparate fetching plan from the third party and from the cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mohamed-abdelrhman committed Feb 8, 2024
1 parent 07356e8 commit 320c22e
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 244 deletions.
136 changes: 136 additions & 0 deletions limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package crossover_limiter

import (
"context"
"fmt"
"github.com/google/uuid"
"log"
"net/http"
"regexp"
)

const (
DefaultRedisPoolSize = 10
UserRateKeySuffix = "-rate"
)

// Config holds configuration to passed to the plugin
type Config struct {
RequestIdPattern string
RateLimitPlanLimitURL string
APIKey string
RedisAuth string
RedisAddress string
RedisPoolSize int
}

// CreateConfig populates the config data object
func CreateConfig() *Config {
return &Config{}
}

type Limiter struct {
next http.Handler
name string
compiledPattern *regexp.Regexp
planCache IPlanCache
planProxy IPlanProxy
}

// New created a new plugin.
func New(ctx context.Context, next http.Handler, config *Config, name string) (http.Handler, error) {
if len(config.APIKey) == 0 {
return nil, fmt.Errorf("APIKey can't be empty")
}
if len(config.RequestIdPattern) == 0 {
return nil, fmt.Errorf("GetRequestIdPattern can't be empty")
}
if len(config.RateLimitPlanLimitURL) == 0 {
return nil, fmt.Errorf("RateLimitPlanLimitURL can't be empty")
}
if len(config.RedisAddress) == 0 {
return nil, fmt.Errorf("RedisAddress can't be empty")
}
if config.RedisPoolSize == 0 {
config.RedisPoolSize = DefaultRedisPoolSize
}

compiledPattern := regexp.MustCompile(config.RequestIdPattern)
planCache := NewPlanCache(config.RedisAddress, config.RedisAuth, config.RedisPoolSize)
planProxy := NewPlanProxy(config.APIKey, config.RateLimitPlanLimitURL)

handler := &Limiter{
next: next,
name: name,
compiledPattern: compiledPattern,
planCache: planCache,
planProxy: planProxy,
}

return handler, nil
}

// ServeHTTP serve http request for the users
func (a *Limiter) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
userId := a.extractUserID(req.URL.Path)
if userId == "" {
rw.WriteHeader(http.StatusBadRequest)
rw.Write([]byte("invalid requestId"))
return
}

userPlan, err := a.planCache.getUserPlan(req.Context(), userId)
if err != nil {
log.Println(err.Error())
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte(err.Error()))
return
}
if err == nil && userPlan == 0 {
userPlan, err = a.planProxy.fetch(userId)
if err != nil {
log.Println(err.Error())
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte("something went wrong"))
return
}
//set user plan
err = a.planCache.setUserPlan(req.Context(), userId, userPlan)
if err != nil {
log.Println(err.Error())
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte("something went wrong"))
return
}
}

key := fmt.Sprintf("%s%s", userId, UserRateKeySuffix)
allow, err := a.planCache.limit(req.Context(), key, userPlan, 1)
if err != nil {
log.Println(err.Error())
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte("something went wrong"))
return
}
if !allow {
rw.WriteHeader(http.StatusTooManyRequests)
rw.Write([]byte("too many requests"))
return
}

a.next.ServeHTTP(rw, req)
}

// extractUserID extract user id from the request
func (a *Limiter) extractUserID(path string) (userId string) {
// Find the first match of the pattern in the URL Path
match := a.compiledPattern.FindStringSubmatch(path)
if len(match) == 0 {
return
}
parsedUUID, err := uuid.Parse(match[0][10:])
if err != nil {
return
}
return parsedUUID.String()
}
67 changes: 67 additions & 0 deletions plan_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package crossover_limiter

import (
"context"
"errors"
"fmt"
"github.com/kotalco/resp"
"strconv"
)

type IPlanCache interface {
setUserPlan(ctx context.Context, userId string, plan int) error
getUserPlan(ctx context.Context, userId string) (int, error)
limit(ctx context.Context, key string, limit int, window int) (bool, error)
}
type PlanCache struct {
resp resp.IClient
}

func NewPlanCache(address string, auth string, poolSize int) IPlanCache {
// ignore error check to run the plugin with Yaegi
// before plugin get loaded into traefik catalog it get checked with Yaegi which don't have redis-server
redisClient, _ := resp.NewRedisClient(address, poolSize, auth)

return &PlanCache{
resp: redisClient,
}
}

func (s *PlanCache) setUserPlan(ctx context.Context, userId string, plan int) error {
return s.resp.Set(ctx, userId, strconv.Itoa(plan))
}

func (s *PlanCache) getUserPlan(ctx context.Context, userId string) (int, error) {
userPlan, err := s.resp.Get(ctx, userId)
if err != nil {
return 0, err
}
if userPlan == "" {
return 0, nil
}
// Parse the user plan.
userPlanInt, err := strconv.Atoi(userPlan)
if err != nil {
return 0, errors.New(fmt.Sprintf("can't parse userPlan: %s, got error: %s", userPlan, err.Error()))
}

return userPlanInt, nil
}

func (s *PlanCache) limit(ctx context.Context, key string, limit int, window int) (bool, error) {
// Increment the counter for the given key.
count, err := s.resp.Incr(ctx, key)
if err != nil {
return false, err
}
if count == 1 {
// If the key is new or expired (i.e., count == 1), set the expiration.
_, err = s.resp.Expire(ctx, key, window)
if err != nil {
return false, err
}
}

// Check against the limit.
return count <= limit, nil
}
76 changes: 76 additions & 0 deletions plan_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package crossover_limiter

import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"time"
)

const DefaultTimeout = 5

type PlanProxyResponse struct {
Data struct {
RequestLimit int `json:"request_limit"`
} `json:"data"`
}

type IPlanProxy interface {
fetch(userId string) (int, error)
}

type PlanProxy struct {
httpClient http.Client
requestUrl *url.URL
apiKey string
}

func NewPlanProxy(apiKey string, rawUrl string) IPlanProxy {
requestUrl, err := url.Parse(rawUrl)
if err != nil {
panic(fmt.Sprintf("invalid raw plan proxy url %s: %v", rawUrl, err))
}
return &PlanProxy{
httpClient: http.Client{
Timeout: DefaultTimeout * time.Second,
},
requestUrl: requestUrl,
apiKey: apiKey,
}
}

func (proxy *PlanProxy) fetch(userId string) (int, error) {
queryParams := url.Values{}
queryParams.Set("userId", userId)
proxy.requestUrl.RawQuery = queryParams.Encode()
httpReq, err := http.NewRequest(http.MethodGet, proxy.requestUrl.String(), nil)
if err != nil {
log.Printf("FetchUserPlan:NewRequest, %s", err.Error())
return 0, errors.New("something went wrong")
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("X-Api-Key", proxy.apiKey)

httpRes, err := proxy.httpClient.Do(httpReq)
defer httpRes.Body.Close()
if err != nil {
log.Printf("FetchUserPlan:Do, %s", err.Error())
return 0, errors.New("something went wrong")
}

if httpRes.StatusCode != http.StatusOK {
log.Printf("FetchUserPlan:InvalidStatusCode: %d", httpRes.StatusCode)
return 0, errors.New("something went wrong")
}

var response PlanProxyResponse
if err = json.NewDecoder(httpRes.Body).Decode(&response); err != nil {
log.Printf("FetchUserPlan:UNMARSHAERPlan, %s", err.Error())
return 0, errors.New("something went wrong")
}

return response.Data.RequestLimit, nil
}
Loading

0 comments on commit 320c22e

Please sign in to comment.