Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ RUN set -ex; \
#!/bin/sh
set -eu

CONF_AGENT_PID=""

# Log function
log() {
echo "[$(date +'%Y-%m-%d %H:%M:%S')] $*"
Expand All @@ -108,7 +110,7 @@ start_conf_agent() {
start_bfe() {
log "Starting bfe..."
cd /home/work/bfe/bin
exec ./bfe -c ../conf/ -l ../log/ -s
exec ./bfe -c ../conf/ -l ../log/ -s__BFE_DEBUG_FLAG__
}

# Signal handler
Expand Down Expand Up @@ -143,6 +145,13 @@ sleep 2
start_bfe
EOF

RUN set -ex; \
if [ "${VARIANT}" = "debug" ]; then \
sed -i 's/__BFE_DEBUG_FLAG__/ -d debug/g' /home/work/entrypoint.sh; \
else \
sed -i 's/__BFE_DEBUG_FLAG__//g' /home/work/entrypoint.sh; \
fi

RUN chmod +x /home/work/entrypoint.sh

EXPOSE 8080 8443 8421
Expand Down
2 changes: 1 addition & 1 deletion bfe_basic/condition/parser/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (s *Scanner) scanEscape(quote rune) bool {
var n int
var base, max uint32
switch s.ch {
case 'a', 'b', 'f', 'n', 'r', 't', 'v', '\\', quote:
case 'a', 'b', 'f', 'n', 'r', 't', 'v', '.', '\\', quote:
s.next()
return true
case '0', '1', '2', '3', '4', '5', '6', '7':
Expand Down
31 changes: 24 additions & 7 deletions bfe_basic/condition/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/bfenetworks/bfe/bfe_basic"
"github.com/bfenetworks/bfe/bfe_basic/condition/parser"
"github.com/bfenetworks/bfe/bfe_http"
"github.com/bfenetworks/bfe/bfe_util"
"github.com/bfenetworks/bfe/bfe_util/net_util"
"github.com/spaolacci/murmur3"
Expand Down Expand Up @@ -1095,10 +1096,10 @@ type ReqBodyJsonFetcher struct{
}

func (pf *ReqBodyJsonFetcher) Fetch(req *bfe_basic.Request) (interface{}, error) {
return ReqBodyJsonFetch(req, pf.path)
return ReqBodyJsonFetch(req, pf.path, nil)
}

func ReqBodyJsonFetch(req *bfe_basic.Request, path string) (string, error) {
func ReqBodyJsonFetch(req *bfe_basic.Request, path string, httpreq *bfe_http.Request) (string, error) {
const jsonCachePrefix = "jsoncache."

if req == nil || req.HttpRequest == nil {
Expand All @@ -1114,21 +1115,37 @@ func ReqBodyJsonFetch(req *bfe_basic.Request, path string) (string, error) {
}
}

bodyAccessor, err := req.HttpRequest.GetBodyAccessor()
if httpreq == nil {
httpreq = req.HttpRequest
}

str, err := HttpReqBodyJsonGet(httpreq, path)

if err != nil {
return "", err
}

req.SetContext(cachepath, str)
return str, nil
}

func HttpReqBodyJsonGet(req *bfe_http.Request, path string) (string, error) {
if req == nil {
return "", fmt.Errorf("fetcher: nil pointer")
}

bodyAccessor, err := req.GetBodyAccessor()
if bodyAccessor == nil {
return "", err
}

body, _ := bodyAccessor.GetBytes()
val := gjson.GetBytes(body, path)
if !val.Exists() {
req.SetContext(cachepath, "")
return "", nil
}

str := val.String()
req.SetContext(cachepath, str)
return str, nil
return val.String(), nil
}

func ReqBodyJsonSet(req *bfe_basic.Request, path string, value string) error {
Expand Down
4 changes: 4 additions & 0 deletions bfe_http/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ type WriteRequestError struct {
Err error
}

func (e WriteRequestError) Unwrap() error {
return e.Err
}

func (e WriteRequestError) Error() string {
return fmt.Sprintf("WriteRequestError: %s", e.Err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions bfe_modules/mod_ai_token_auth/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func tokenCheck(conf *TokenFile) error {
if conf.UnlimitedQuota && conf.RemainQuota != 0 {
return errors.New("if UnlimitedQuota is true, RemainQuota must be 0")
}
if conf.Models != nil {
if conf.Models != nil && *conf.Models != "" {
conf.models = strings.Split(*conf.Models, ",")
for i := 0; i < len(conf.models); i++ {
conf.models[i] = strings.TrimSpace(conf.models[i])
Expand All @@ -87,7 +87,7 @@ func tokenCheck(conf *TokenFile) error {
}
}
}
if conf.Subnet != nil {
if conf.Subnet != nil && *conf.Subnet != "" {
res := strings.Split(*conf.Subnet, ",")
conf.subnet = make([]*net.IPNet, len(res))
for i := 0; i < len(res); i++ {
Expand Down
2 changes: 1 addition & 1 deletion bfe_modules/mod_ai_token_auth/token_rule_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (m *ModuleAITokenAuth) ValidateUserTokenByReq(req *bfe_basic.Request) (toke
}

if len(token.Models) > 0 {
model, err := condition.ReqBodyJsonFetch(req, "model")
model, err := condition.ReqBodyJsonFetch(req, "model", nil)
if err != nil || model == "" {
return nil, fmt.Errorf("model not found in request body: %v", err)
}
Expand Down
33 changes: 22 additions & 11 deletions bfe_modules/mod_body_process/body_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func (e *RejectionError) Error() string {
return e.Message
}

type BPError struct {
Err error
}

func (e *BPError) Unwrap() error {
return e.Err
}

func (e *BPError) Error() string {
return fmt.Sprintf("BodyProcessError: %s", e.Err.Error())
}

type Event interface {
// GetType() string
// GetData() []byte
Expand Down Expand Up @@ -107,7 +119,7 @@ func (bp *BodyProcessor) CreateEventDecoder(fac EventDecoderFac) {
// defer bp.mu.Unlock()
dec, err := fac(bp.source)
if err != nil {
bp.err = fmt.Errorf("create event decoder: %w", err)
bp.err = &BPError{fmt.Errorf("create event decoder: %w", err)}
return
}
bp.decoder = dec
Expand All @@ -118,7 +130,7 @@ func (bp *BodyProcessor) CreateEventEncoder(fac EventEncoderFac) {
// defer bp.mu.Unlock()
enc, err := fac(bp.buffer)
if err != nil {
bp.err = fmt.Errorf("create event encoder: %w", err)
bp.err = &BPError{fmt.Errorf("create event encoder: %w", err)}
return
}
bp.encoder = enc
Expand Down Expand Up @@ -166,8 +178,8 @@ func (bp *BodyProcessor) fillBuffer() error {
for {
events, decodeErr := bp.decoder.Decode()
if decodeErr != nil {
bp.err = decodeErr
return decodeErr
bp.err = &BPError{decodeErr}
return bp.err
}
if len(events) == 0 {
bp.err = io.EOF
Expand All @@ -182,20 +194,20 @@ func (bp *BodyProcessor) fillBuffer() error {
var processErr error
events, processErr = processor.Process(events)
if processErr != nil {
bp.err = processErr
bp.err = &BPError{processErr}
// 检查是否为中断错误
if cvErr, ok := processErr.(*RejectionError); ok {
bp.handleRejection(cvErr)
return cvErr
}
return processErr
return bp.err
}
}
// 编码事件
n, encodeErr := bp.encoder.Encode(events)
if encodeErr != nil {
bp.err = encodeErr
return encodeErr
bp.err = &BPError{encodeErr}
return bp.err
}
if n > 0 {
break // 至少有一个事件被处理
Expand Down Expand Up @@ -226,7 +238,6 @@ func (bp *BodyProcessor) Close() error {
// bp.mu.Lock()
// defer bp.mu.Unlock()

bp.buffer.Reset()
return bp.source.Close()
}

Expand Down Expand Up @@ -507,7 +518,7 @@ func (dec *LineDecoder) Decode() ([]Event, error) {
if err == io.EOF {
return []Event{}, nil // 没有更多数据
}
return nil, fmt.Errorf("line decode error: %w", err)
return nil, err
}

type JsonDecoder struct {
Expand All @@ -526,7 +537,7 @@ func (dec *JsonDecoder) Decode() ([]Event, error) {
if err == io.EOF {
return []Event{}, nil // 没有更多数据
}
return nil, fmt.Errorf("json decode error: %w", err)
return nil, err
}
re := RawEvent(event)
return []Event{&re}, nil
Expand Down
27 changes: 24 additions & 3 deletions bfe_modules/mod_body_process/body_process_rule_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,27 @@ type BodyProcessConfig struct {
}

func BodyProcessConfigCheck(config *BodyProcessConfig) error {
if config == nil {
return nil
}

if config.Dec != "" && config.Dec != "line" && config.Dec != "json" && config.Dec != "sse" {
return errors.New("invalid Dec")
}

if config.Enc != "" && config.Enc != "line" && config.Enc != "json" && config.Enc != "sse" {
return errors.New("invalid Enc")
}

for _, proc := range config.Proc {
if proc.Name != "textfilter" {
return errors.New("invalid Proc name")
}
if len(proc.Params) != 1 {
return errors.New("invalid Proc params, textfilter need only 1 param")
}
}

return nil
}

Expand Down Expand Up @@ -72,9 +93,9 @@ func processRuleCheck(conf processRuleFile) error {
return errors.New("no Cond")
}

if conf.RequestProcess == nil && conf.ResponseProcess == nil {
return errors.New("no RequestProcess or ResponseProcess")
}
// if conf.RequestProcess == nil && conf.ResponseProcess == nil {
// return errors.New("no RequestProcess or ResponseProcess")
// }

if err := BodyProcessConfigCheck(conf.RequestProcess); err != nil {
return err
Expand Down
35 changes: 27 additions & 8 deletions bfe_server/reverseproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package bfe_server

import (
"crypto/tls"
"fmt"
"errors"
"io"
"net"
"reflect"
Expand All @@ -46,6 +46,7 @@ import (
"github.com/bfenetworks/bfe/bfe_http2"
"github.com/bfenetworks/bfe/bfe_module"
"github.com/bfenetworks/bfe/bfe_modules/mod_ai_token_auth"
"github.com/bfenetworks/bfe/bfe_modules/mod_body_process"
"github.com/bfenetworks/bfe/bfe_route"
"github.com/bfenetworks/bfe/bfe_route/bfe_cluster"
"github.com/bfenetworks/bfe/bfe_spdy"
Expand Down Expand Up @@ -440,6 +441,17 @@ func (p *ReverseProxy) clusterInvoke(srv *BfeServer, cluster *bfe_cluster.BfeClu
backend.OnFailByCluster(cluster)

case bfe_http.WriteRequestError, bfe_fcgi.WriteRequestError:
var be *mod_body_process.BPError
if errors.As(err, &be) {
// body process error, no retry
request.ErrCode = bfe_basic.ErrBkBodyProcess
request.ErrMsg = err.Error()
p.proxyState.ErrBkBodyProcess.Inc(1)
allowRetry = false
action = closeAfterReply
break
}

request.ErrCode = bfe_basic.ErrBkWriteRequest
request.ErrMsg = err.Error()
p.proxyState.ErrBkWriteRequest.Inc(1)
Expand Down Expand Up @@ -634,8 +646,8 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic
var outreq *bfe_http.Request
var serverConf *bfe_route.ServerDataConf
var writeTimer *time.Timer
var bf BufferFiller
var ok bool
// var bf BufferFiller
// var ok bool

req := basicReq.HttpRequest
isRedirect := false
Expand Down Expand Up @@ -665,7 +677,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic
// close the connection after response
action = closeAfterReply
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
return
goto send_response
case bfe_module.BfeHandlerRedirect:
// make redirect
Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header)
Expand Down Expand Up @@ -705,7 +717,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic
// close the connection after response
action = closeAfterReply
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
return
goto send_response
case bfe_module.BfeHandlerRedirect:
// make redirect
Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header)
Expand Down Expand Up @@ -808,19 +820,26 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic
mod_ai_token_auth.SetApiKey(outreq, *cluster.AIConf.Key)
}
if cluster.AIConf.ModelMapping != nil {
model, err := condition.ReqBodyJsonFetch(basicReq, "model")
model, err := condition.ReqBodyJsonFetch(basicReq, "model", outreq)
if err == nil && model != "" {
newModel, ok := (*cluster.AIConf.ModelMapping)[model]
if ok {
err = condition.ReqBodyJsonSet(basicReq, "model", newModel)
if err != nil {
log.Logger.Warn("Failed to set model in request body: %s", err)
// just continue, not return error
} else {
// outreq body already changed, need reset Content-Length
if outreq.ContentLength >= 0 {
outreq.ContentLength = -1
outreq.Header.Del("Content-Length")
}
}
}
}
}
}
/*
// do body process before forwarding
bf, ok = outreq.Body.(BufferFiller)
if ok {
Expand All @@ -841,7 +860,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic
goto send_response
}
}
*/
// invoke cluster to get response
res, action, err = p.clusterInvoke(srv, cluster, basicReq, rw)
basicReq.HttpResponse = res
Expand Down Expand Up @@ -895,7 +914,7 @@ response_got:
// close the connection after response
action = closeAfterReply
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
return
goto send_response
case bfe_module.BfeHandlerRedirect:
// make redirect
Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header)
Expand Down