Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ dist/*
conf/wasm_plugin

.DS_Store
.git*
11 changes: 11 additions & 0 deletions bfe_basic/condition/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,17 @@ func buildPrimitive(node *parser.CallExpr) (Condition, error) {
fetcher: &BfeTimeFetcher{},
matcher: matcher,
}, nil

case "req_body_json_in":
return &PrimitiveCond{
name: node.Fun.Name,
node: node,
fetcher: &ReqBodyJsonFetcher{
path: node.Args[0].Value,
},
matcher: NewInMatcher(node.Args[1].Value, node.Args[2].ToBool()),
}, nil

default:
return nil, fmt.Errorf("unsupported primitive %s", node.Fun.Name)
}
Expand Down
1 change: 1 addition & 0 deletions bfe_basic/condition/parser/semant.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var funcProtos = map[string][]Token{
"req_context_value_in": {STRING, STRING, BOOL},
"bfe_time_range": []Token{STRING, STRING},
"bfe_periodic_time_range": []Token{STRING, STRING, STRING},
"req_body_json_in": []Token{STRING, STRING, BOOL},
}

func prototypeCheck(expr *CallExpr) error {
Expand Down
71 changes: 69 additions & 2 deletions bfe_basic/condition/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
"strconv"
"strings"
"time"
)

import (
"github.com/bfenetworks/bfe/bfe_basic"
"github.com/bfenetworks/bfe/bfe_basic/condition/parser"
"github.com/bfenetworks/bfe/bfe_util"
"github.com/bfenetworks/bfe/bfe_util/net_util"
"github.com/spaolacci/murmur3"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)

const (
Expand Down Expand Up @@ -1089,3 +1089,70 @@ func (t *PeriodicTimeMatcher) Match(v interface{}) bool {
seconds := hour*3600 + minute*60 + second
return seconds >= t.startTime && seconds <= t.endTime
}

type ReqBodyJsonFetcher struct{
path string
}

func (pf *ReqBodyJsonFetcher) Fetch(req *bfe_basic.Request) (interface{}, error) {
return ReqBodyJsonFetch(req, pf.path)
}

func ReqBodyJsonFetch(req *bfe_basic.Request, path string) (string, error) {
const jsonCachePrefix = "jsoncache."

if req == nil || req.HttpRequest == nil {
return "", fmt.Errorf("fetcher: nil pointer")
}

cachepath := jsonCachePrefix + path
cachedVal := req.GetContext(cachepath)
if cachedVal != nil {
str, ok := cachedVal.(string)
if ok {
return str, nil
}
}

bodyAccessor, err := req.HttpRequest.GetBodyAccessor()
if bodyAccessor == nil {
return "", err
}

body, _ := bodyAccessor.GetBytes()
val := gjson.GetBytes(body, path)
if !val.Exists() {
req.SetContext(cachepath, "")
return "", nil
}

str := val.String()
req.SetContext(cachepath, str)
return str, nil
}

func ReqBodyJsonSet(req *bfe_basic.Request, path string, value string) error {
if req == nil || req.OutRequest == nil {
return fmt.Errorf("set json body error: nil pointer")
}

bodyAccessor, err := req.OutRequest.GetBodyAccessor()
if bodyAccessor == nil {
return err
}

body, _ := bodyAccessor.GetBytes()
var newBody []byte
if path == "" {
newBody = []byte(value)
} else {
newBody, err = sjson.SetBytes(body, path, value)
if err != nil {
return fmt.Errorf("set json body error, path: %s, value: %s, err: %v", path, value, err)
}
}

bodyAccessor.SetBytes(newBody, false)

return nil
}
1 change: 1 addition & 0 deletions bfe_basic/error_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
ErrBkRetryTooMany = errors.New("BK_RETRY_TOOMANY") // reach retry max
ErrBkNoSubClusterCross = errors.New("BK_NO_SUB_CLUSTER_CROSS") // no sub-cluster found
ErrBkCrossRetryBalance = errors.New("BK_CROSS_RETRY_BALANCE") // cross retry balance failed
ErrBkBodyProcess = errors.New("BK_BODY_PROCESS") // body process error

// GSLB error
ErrGslbBlackhole = errors.New("GSLB_BLACKHOLE") // deny by blackhole
Expand Down
7 changes: 7 additions & 0 deletions bfe_config/bfe_cluster_conf/cluster_conf/cluster_conf_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ type BackendHTTPS struct {
protocol string // protocol of backend https
}

type AIConf struct {
Type int // type of LLM service, reserved for future use. should be 0 now.
ModelMapping *map[string]string // model mapping, key is model name in req, value is model name in backend
Key *string // API key for AI service
}

func (conf *BackendHTTPS) GetProtocol() string {
return conf.protocol
}
Expand Down Expand Up @@ -222,6 +228,7 @@ type ClusterConf struct {
GslbBasic *GslbBasicConf // gslb basic conf for cluster
ClusterBasic *ClusterBasicConf // basic conf for cluster
HTTPSConf *BackendHTTPS // backend's https conf
AIConf *AIConf // ai conf for cluster
}

type ClusterToConf map[string]ClusterConf
Expand Down
49 changes: 47 additions & 2 deletions bfe_http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

import (
"github.com/bfenetworks/bfe/bfe_bufio"
"github.com/bfenetworks/bfe/bfe_net/textproto"
"github.com/bfenetworks/bfe/bfe_tls"
Expand Down Expand Up @@ -974,3 +973,49 @@ func (r *Request) closeBody() {
r.Body.Close()
}
}
func (r *Request) GetBodyAccessor() (BodyAccessor, error) {
if r.Body == nil {
return nil, nil
}
body := r.Body
for {
bodyAccessor, ok := body.(BodyAccessor)
if ok {
return bodyAccessor, nil
}

sourcer, ok := body.(SourceGetter)
if !ok {
break
}

body = sourcer.GetSource()
}

// If the body is not a BodyAccessor, we will try to convert it to a BytesBody
var err error
r.Body, err = NewBytesBody(r.Body, GetAccessibleBodySize())
if err != nil {
return nil, fmt.Errorf("can't get body")
}
return r.Body.(BodyAccessor), nil
}

const DefaultAccessibleBodySize = 1024*1024*2
const MaxAccessibleBodySize = 1024*1024*8
var accessibleBodySize = int64(DefaultAccessibleBodySize)

func SetAccessibleBodySize(size int64) {
if size <= 0 {
size = DefaultAccessibleBodySize
}
atomic.StoreInt64(&accessibleBodySize, size)
}

func GetAccessibleBodySize() int64 {
return atomic.LoadInt64(&accessibleBodySize)
}

type SourceGetter interface {
GetSource() io.ReadCloser
}
75 changes: 75 additions & 0 deletions bfe_http/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,78 @@ func parseContentLength(cl string) (int64, error) {
return n, nil

}

type BodyAccessor interface {
GetBytes() ([]byte, bool)
SetBytes([]byte, bool)
}

//body with BodyAccessor interface
type bytes_body struct {
src io.ReadCloser // source body
buf []byte // bytes read out from src
all bool // all already read out from src to buf
r io.Reader // multiReader of buf and src
}

func (b *bytes_body) Read(p []byte) (n int, err error) {
return b.r.Read(p)
}

func (b *bytes_body) Close() error {
return b.src.Close()
}

func (b *bytes_body) Peek(n int) ([]byte, error) {
if n < 0 {
return nil, fmt.Errorf("negative peek count")
}
if n > len(b.buf) {
n = len(b.buf)
}
return b.buf[:n], nil
}

func (b *bytes_body) ForcePeek(n int) ([]byte, error) {
return b.Peek(n)
}

func (b *bytes_body) GetBytes() ([]byte, bool) {
return b.buf, b.all
}

func (b *bytes_body) SetBytes(newBuf []byte, all bool) {
b.buf = newBuf
br := bytes.NewBuffer(newBuf)
b.all = b.all || all
if b.all {
b.r = br
} else {
b.r = io.MultiReader(br, b.src)
}
}

func NewBytesBody(src io.ReadCloser, maxSize int64) (*bytes_body, error) {
bb, err := io.ReadAll(io.LimitReader(src, maxSize))
if err != nil {
return nil, fmt.Errorf("io.ReadAll: %s", err.Error())
}

br := bytes.NewBuffer(bb)

if len(bb) < int(maxSize) {
return &bytes_body{
src: src,
buf: bb,
all: true,
r: br,
}, nil
} else {
return &bytes_body{
src: src,
buf: bb,
all: false,
r: io.MultiReader(br, src),
}, nil
}
}
9 changes: 8 additions & 1 deletion bfe_modules/bfe_modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package bfe_modules
import (
"github.com/bfenetworks/bfe/bfe_module"
"github.com/bfenetworks/bfe/bfe_modules/mod_access"
"github.com/bfenetworks/bfe/bfe_modules/mod_ai_token_auth"
"github.com/bfenetworks/bfe/bfe_modules/mod_auth_basic"
"github.com/bfenetworks/bfe/bfe_modules/mod_auth_jwt"
"github.com/bfenetworks/bfe/bfe_modules/mod_auth_request"
"github.com/bfenetworks/bfe/bfe_modules/mod_block"
"github.com/bfenetworks/bfe/bfe_modules/mod_body_process"
"github.com/bfenetworks/bfe/bfe_modules/mod_compress"
"github.com/bfenetworks/bfe/bfe_modules/mod_cors"
"github.com/bfenetworks/bfe/bfe_modules/mod_doh"
Expand Down Expand Up @@ -139,7 +141,12 @@ var moduleList = []bfe_module.BfeModule{

// mod_unified_waf
mod_unified_waf.NewModuleWaf(),
}

// mod_ai_token_auth
mod_ai_token_auth.NewModuleAITokenAuth(),

// mod_body_process
mod_body_process.NewModuleBodyProcess(),}

// init modules list
func InitModuleList(modules []bfe_module.BfeModule) {
Expand Down
Loading