From ed1d7c01a42f0c79e7347cb66f3296b8e5d7e1b4 Mon Sep 17 00:00:00 2001 From: alexGNX Date: Tue, 4 Mar 2025 12:47:38 +0100 Subject: [PATCH] fix(venom): user executors performance issues Signed-off-by: alexGNX --- dump.go | 2 +- executors/exec/exec.go | 55 +++++++++------- executors/grpc/grpc.go | 2 +- log.go | 24 ++++--- process.go | 5 ++ read_partial.go | 21 +----- types.go | 5 +- types_executor.go | 95 ++++++++++++++++++++------- venom.go | 141 +++++++++++++++++++---------------------- 9 files changed, 195 insertions(+), 155 deletions(-) diff --git a/dump.go b/dump.go index 56bf1c4d..4e66e220 100644 --- a/dump.go +++ b/dump.go @@ -95,7 +95,7 @@ func WithFormatterLowerFirstKey() dump.KeyFormatterFunc { return func(s string, level int) string { if level == 0 && strings.Contains(s, ".") { pos := strings.Index(s, ".") - return strings.ToLower(s[0:pos])+s[pos:] + return strings.ToLower(s[0:pos]) + s[pos:] } if level == 0 { return strings.ToLower(f(s, level)) diff --git a/executors/exec/exec.go b/executors/exec/exec.go index dddde645..bb89a0a1 100644 --- a/executors/exec/exec.go +++ b/executors/exec/exec.go @@ -1,7 +1,6 @@ package exec import ( - "bufio" "context" "fmt" "os" @@ -161,42 +160,50 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro return nil, fmt.Errorf("runScriptAction: Cannot get stderr pipe: %s", err) } - stdoutreader := bufio.NewReader(stdout) - stderrreader := bufio.NewReader(stderr) - result := Result{} outchan := make(chan bool) + go func() { + var sb strings.Builder + sb.Grow(1024 * 1024) // Pre-allocate 1MB + + // For efficiency, read in larger chunks + buf := make([]byte, 64*1024) // 64KB buffer for { - line, errs := stdoutreader.ReadString('\n') - if errs != nil { - // ReadString returns what has been read even though an error was encountered - // ie. capture outputs with no '\n' at the end - result.Systemout += line - stdout.Close() - close(outchan) - return + n, err := stdout.Read(buf) + if n > 0 { + sb.Write(buf[:n]) + } + if err != nil { + break } - result.Systemout += line - venom.Debug(ctx, venom.HideSensitive(ctx, line)) } + + result.Systemout = sb.String() + close(outchan) }() errchan := make(chan bool) go func() { + var sb strings.Builder + sb.Grow(64 * 1024) // Pre-allocate 64KB for stderr + + buf := make([]byte, 8*1024) // 8KB buffer for stderr for { - line, errs := stderrreader.ReadString('\n') - if errs != nil { - // ReadString returns what has been read even though an error was encountered - // ie. capture outputs with no '\n' at the end - result.Systemerr += line - stderr.Close() - close(errchan) - return + n, err := stderr.Read(buf) + if n > 0 { + chunk := buf[:n] + sb.Write(chunk) + venom.Debug(ctx, venom.HideSensitive(ctx, string(chunk))) + } + if err != nil { + break } - result.Systemerr += line - venom.Debug(ctx, venom.HideSensitive(ctx, line)) } + + result.Systemerr = sb.String() + stderr.Close() + close(errchan) }() if err := cmd.Start(); err != nil { diff --git a/executors/grpc/grpc.go b/executors/grpc/grpc.go index cefb568f..39a79e43 100644 --- a/executors/grpc/grpc.go +++ b/executors/grpc/grpc.go @@ -265,7 +265,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro // invoke the gRPC err = grpcurl.InvokeRPC(ctx, descSource, cc, e.Service+"/"+e.Method, headers, &handle, rf.Next) if err != nil { - return nil, err + return nil, fmt.Errorf("grpcurl.InvokeRPC() failed.\nUrl: %q\nService: %q\nMethod: %q\nData:%v: %v", e.URL, e.Service, e.Method, e.Data, err) } elapsed := time.Since(start) diff --git a/log.go b/log.go index 13233857..73b46b67 100644 --- a/log.go +++ b/log.go @@ -44,17 +44,25 @@ func asJsonString(i interface{}) string { // HideSensitive replace the value with __hidden__ func HideSensitive(ctx context.Context, arg interface{}) string { s := ctx.Value(ContextKey("secrets")) + + // Fast path: if no secrets to hide, avoid unnecessary string conversion + if s == nil { + if str, ok := arg.(string); ok { + return str + } + return fmt.Sprint(arg) + } cleanVars := fmt.Sprint(arg) - if s != nil { - switch reflect.TypeOf(s).Kind() { - case reflect.Slice: - secrets := reflect.ValueOf(s) - for i := 0; i < secrets.Len(); i++ { - secret := fmt.Sprint(secrets.Index(i).Interface()) - cleanVars = strings.ReplaceAll(cleanVars, secret, "__hidden__") - } + + switch reflect.TypeOf(s).Kind() { + case reflect.Slice: + secrets := reflect.ValueOf(s) + for i := 0; i < secrets.Len(); i++ { + secret := fmt.Sprint(secrets.Index(i).Interface()) + cleanVars = strings.ReplaceAll(cleanVars, secret, "__hidden__") } } + return cleanVars } diff --git a/process.go b/process.go index 15e33d32..79f4dd4e 100644 --- a/process.go +++ b/process.go @@ -89,6 +89,11 @@ func (v *Venom) Parse(ctx context.Context, path []string) error { return err } + err = v.registerUserExecutors(ctx) + if err != nil { + return errors.Wrapf(err, "unable to register user executors") + } + missingVars := []string{} extractedVars := []string{} for i := range v.Tests.TestSuites { diff --git a/read_partial.go b/read_partial.go index 6a09e8e3..84c82784 100644 --- a/read_partial.go +++ b/read_partial.go @@ -11,24 +11,6 @@ import ( "github.com/rockbears/yaml" ) -func getUserExecutorInputYML(ctx context.Context, btesIn []byte) (H, error) { - btes := readPartialYML(btesIn, "input") - - var result = map[string]interface{}{} - var tmpResult = map[string]interface{}{} - - if len(btes) > 0 { - if err := yaml.Unmarshal([]byte(btes), &tmpResult); err != nil { - return nil, err - } - } - for k, v := range tmpResult { - result[k] = v - } - - return result, nil -} - func getVarFromPartialYML(ctx context.Context, btesIn []byte) (H, error) { btes := readPartialYML(btesIn, "vars") type partialVars struct { @@ -52,6 +34,9 @@ func readPartialYML(btes []byte, attribute string) string { var record bool for scanner.Scan() { line := scanner.Text() + line = strings.TrimFunc(line, func(r rune) bool { + return !unicode.IsGraphic(r) + }) if strings.HasPrefix(line, attribute+":") { record = true } else if len(line) > 0 { diff --git a/types.go b/types.go index d48d9aad..e1ce63e1 100644 --- a/types.go +++ b/types.go @@ -5,6 +5,7 @@ import ( "encoding/json" "encoding/xml" "fmt" + "maps" "strings" "time" "unicode" @@ -25,9 +26,7 @@ const ( type H map[string]interface{} func (h H) Clone() H { - var h2 = make(H, len(h)) - h2.AddAll(h) - return h2 + return maps.Clone(h) } func (h *H) Add(k string, v interface{}) { diff --git a/types_executor.go b/types_executor.go index 7d458976..e9ae98b7 100644 --- a/types_executor.go +++ b/types_executor.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "reflect" + "regexp" "strings" "github.com/gosimple/slug" @@ -168,11 +169,13 @@ func GetExecutorResult(r interface{}) map[string]interface{} { } type UserExecutor struct { - Executor string `json:"executor" yaml:"executor"` - Input H `json:"input" yaml:"input"` - RawTestSteps []json.RawMessage `json:"steps" yaml:"steps"` - Output json.RawMessage `json:"output" yaml:"output"` - Filename string `json:"-" yaml:"-"` + Executor string + Input H `json:"input" yaml:"input"` + TestSteps []json.RawMessage `json:"steps" yaml:"steps"` + Raw []byte `json:"-" yaml:"-"` // the raw file content of the executor + RawInputs []byte `json:"-" yaml:"-"` + Filename string `json:"-" yaml:"-"` + Output json.RawMessage `json:"output" yaml:"output"` } // Run is not implemented on user executor @@ -202,34 +205,66 @@ func (ux UserExecutor) ZeroValueResult() interface{} { func (v *Venom) RunUserExecutor(ctx context.Context, runner ExecutorRunner, tcIn *TestCase, tsIn *TestStepResult, step TestStep) (interface{}, error) { vrs := tcIn.TestSuiteVars.Clone() - uxIn := runner.GetExecutor().(UserExecutor) - - for k, va := range uxIn.Input { - if strings.HasPrefix(k, "input.") { - // do not reinject input.vars from parent user executor if exists - continue - } else if !strings.HasPrefix(k, "venom") { - if vl, ok := step[k]; ok && vl != "" { // value from step - vrs.AddWithPrefix("input", k, vl) - } else { // default value from executor - vrs.AddWithPrefix("input", k, va) + ux := runner.GetExecutor().(UserExecutor) + var tsVars map[string]string + newUX := UserExecutor{} + var err error + + // process inputs + if len(ux.RawInputs) != 0 { + tsVars, err = DumpString(vrs) + if err != nil { + return nil, errors.Wrapf(err, "error processing executor inputs: unable to dump testsuite vars") + } + + interpolatedInput, err := interpolate.Do(string(ux.RawInputs), tsVars) + if err != nil { + return nil, errors.Wrapf(err, "unable to interpolate executor inputs %q", ux.Executor) + } + + err = yaml.Unmarshal([]byte(interpolatedInput), &newUX) + if err != nil { + return nil, errors.Wrapf(err, "unable to unmarshal inputs for executor %q - raw interpolated:\n%v", ux.Executor, string(interpolatedInput)) + } + + for k, va := range newUX.Input { + if strings.HasPrefix(k, "input.") { + // do not reinject input.vars from parent user executor if exists + continue + } else if !strings.HasPrefix(k, "venom") { + if vl, ok := step[k]; ok && vl != "" { // value from step + vrs.AddWithPrefix("input", k, vl) + } else { // default value from executor + vrs.AddWithPrefix("input", k, va) + } + } else { + vrs.Add(k, va) } - } else { - vrs.Add(k, va) + } + tsVars, err = DumpString(vrs) + if err != nil { + return nil, errors.Wrapf(err, "error processing executor inputs: unable to dump testsuite vars") } } - // reload the user executor with the interpolated vars - _, exe, err := v.GetExecutorRunner(ctx, step, vrs) + + interpolatedFull, err := interpolate.Do(string(ux.Raw), tsVars) + if err != nil { + return nil, errors.Wrapf(err, "unable to interpolate executor %q", ux.Executor) + } + // quote any remaining template expressions to ensure proper YAML parsing + sanitized := quoteTemplateExpressions([]byte(interpolatedFull)) + + err = yaml.Unmarshal([]byte(sanitized), &newUX) if err != nil { - return nil, errors.Wrapf(err, "unable to reload executor") + return nil, errors.Wrapf(err, "unable to unmarshal executor %q - raw interpolated :\n%v", ux.Executor, string(sanitized)) } - ux := exe.GetExecutor().(UserExecutor) + ux.Output = newUX.Output tc := &TestCase{ TestCaseInput: TestCaseInput{ Name: ux.Executor, - RawTestSteps: ux.RawTestSteps, Vars: vrs, + RawTestSteps: newUX.TestSteps, }, number: tcIn.number, TestSuiteVars: tcIn.TestSuiteVars, @@ -283,7 +318,7 @@ func (v *Venom) RunUserExecutor(ctx context.Context, runner ExecutorRunner, tcIn } if len(tsIn.Errors) > 0 { - return outputResult, fmt.Errorf("failed") + return outputResult, fmt.Errorf("executor %q failed - raw interpolated:\n%v", ux.Executor, string(sanitized)) } // here, we have the user executor results. @@ -332,3 +367,15 @@ func (v *Venom) RunUserExecutor(ctx context.Context, runner ExecutorRunner, tcIn } return result, nil } + +// quoteTemplateExpressions adds double quotes around template expressions in YAML content. +// It specifically targets expressions that follow a colon and whitespace like 'key: {{.variable}}' +// and are not already enclosed in quotes. This ensures proper YAML parsing of template variables. +func quoteTemplateExpressions(content []byte) []byte { + // First capture group matches everything up to the colon, checking the last non-whitespace + // character isn't a quote (to skip JSON keys) + re := regexp.MustCompile(`(?m)(^.*[^"\s][\s]*)(:\s+)({{.*?}})(.*?)(?:\s*)$`) + + // Put quotes around the template expression and what follows it + return re.ReplaceAll(content, []byte(`$1$2"$3$4"`)) +} diff --git a/venom.go b/venom.go index c163a921..c9bb692c 100644 --- a/venom.go +++ b/venom.go @@ -10,14 +10,13 @@ import ( "path" "path/filepath" "plugin" + "reflect" "sort" "strings" "github.com/confluentinc/bincover" "github.com/fatih/color" - "github.com/ovh/cds/sdk/interpolate" "github.com/pkg/errors" - "github.com/rockbears/yaml" "github.com/spf13/cast" ) @@ -42,15 +41,14 @@ type ContextKey string // New instantiates a new venom on venom run cmd func New() *Venom { v := &Venom{ - LogOutput: os.Stdout, - PrintFunc: fmt.Printf, - executorsBuiltin: map[string]Executor{}, - executorsPlugin: map[string]Executor{}, - executorsUser: map[string]Executor{}, - executorFileCache: map[string][]byte{}, - variables: map[string]interface{}{}, - secrets: map[string]interface{}{}, - OutputFormat: "xml", + LogOutput: os.Stdout, + PrintFunc: fmt.Printf, + executorsBuiltin: map[string]Executor{}, + executorsPlugin: map[string]Executor{}, + executorsUser: map[string]Executor{}, + variables: map[string]interface{}{}, + secrets: map[string]interface{}{}, + OutputFormat: "xml", } return v } @@ -58,11 +56,10 @@ func New() *Venom { type Venom struct { LogOutput io.Writer - PrintFunc func(format string, a ...interface{}) (n int, err error) - executorsBuiltin map[string]Executor - executorsPlugin map[string]Executor - executorsUser map[string]Executor - executorFileCache map[string][]byte + PrintFunc func(format string, a ...interface{}) (n int, err error) + executorsBuiltin map[string]Executor + executorsPlugin map[string]Executor + executorsUser map[string]Executor Tests Tests variables H @@ -116,13 +113,18 @@ func (v *Venom) RegisterExecutorPlugin(name string, e Executor) { v.executorsPlugin[name] = e } -// RegisterExecutorUser register User sxecutors -func (v *Venom) RegisterExecutorUser(name string, e Executor) { +// RegisterExecutorUser registers an user executor +func (v *Venom) RegisterExecutorUser(name string, e Executor) error { + if existing, ok := v.executorsUser[name]; ok { + return fmt.Errorf("executor %q already exists (from file %q)", name, existing.(UserExecutor).Filename) + } + v.executorsUser[name] = e + return nil } -// GetExecutorRunner initializes a test by name -// no type -> exec is default +// GetExecutorRunner initializes a test according to its type +// if no type is provided, exec is default func (v *Venom) GetExecutorRunner(ctx context.Context, ts TestStep, h H) (context.Context, ExecutorRunner, error) { name, _ := ts.StringValue("type") script, _ := ts.StringValue("script") @@ -168,10 +170,6 @@ func (v *Venom) GetExecutorRunner(ctx context.Context, ts TestStep, h H) (contex return ctx, newExecutorRunner(ex, name, "builtin", retry, retryIf, delay, timeout, info), nil } - if err := v.registerUserExecutors(ctx, name, vars); err != nil { - Debug(ctx, "executor %q is not implemented as user executor - err:%v", name, err) - } - if ex, ok := v.executorsUser[name]; ok { return ctx, newExecutorRunner(ex, name, "user", retry, retryIf, delay, timeout, info), nil } @@ -181,13 +179,13 @@ func (v *Venom) GetExecutorRunner(ctx context.Context, ts TestStep, h H) (contex } // then add the executor plugin to the map to not have to load it on each step - if ex, ok := v.executorsUser[name]; ok { + if ex, ok := v.executorsPlugin[name]; ok { return ctx, newExecutorRunner(ex, name, "plugin", retry, retryIf, delay, timeout, info), nil } - return ctx, nil, fmt.Errorf("executor %q is not implemented", name) + return ctx, nil, fmt.Errorf("user executor %q not found - loaded executors are: %v", name, reflect.ValueOf(v.executorsUser).MapKeys()) } -func (v *Venom) getUserExecutorFilesPath(vars map[string]string) (filePaths []string, err error) { +func (v *Venom) getUserExecutorFilesPath(ctx context.Context, vars map[string]string) []string { var libpaths []string if v.LibDir != "" { p := strings.Split(v.LibDir, string(os.PathListSeparator)) @@ -195,86 +193,77 @@ func (v *Venom) getUserExecutorFilesPath(vars map[string]string) (filePaths []st } libpaths = append(libpaths, path.Join(vars["venom.testsuite.workdir"], "lib")) + //use a map to avoid duplicates + filepaths := make(map[string]bool) + for _, p := range libpaths { p = strings.TrimSpace(p) - err = filepath.Walk(p, func(fp string, f os.FileInfo, err error) error { + err := filepath.Walk(p, func(fp string, f os.FileInfo, err error) error { switch ext := filepath.Ext(fp); ext { case ".yml", ".yaml": - filePaths = append(filePaths, fp) + filepaths[fp] = true } return nil }) if err != nil { - return nil, err + Warn(ctx, "Unable to list files in lib directory %q: %v", p, err) } } - sort.Strings(filePaths) - if len(filePaths) == 0 { - return nil, fmt.Errorf("no user executor yml file selected") + userExecutorFiles := make([]string, len(filepaths)) + i := 0 + for p := range filepaths { + userExecutorFiles[i] = p + i++ + } + + sort.Strings(userExecutorFiles) + if len(userExecutorFiles) == 0 { + Warn(ctx, "no user executor yml file selected") } - return filePaths, nil + return userExecutorFiles } -func (v *Venom) registerUserExecutors(ctx context.Context, name string, vars map[string]string) error { - executorsPath, err := v.getUserExecutorFilesPath(vars) +func (v *Venom) registerUserExecutors(ctx context.Context) error { + vars, err := DumpStringPreserveCase(v.variables) if err != nil { - return err + return errors.Wrapf(err, "unable to parse variables") } + executorsPath := v.getUserExecutorFilesPath(ctx, vars) + for _, f := range executorsPath { Info(ctx, "Reading %v", f) - btes, ok := v.executorFileCache[f] - if !ok { - btes, err = os.ReadFile(f) - if err != nil { - return errors.Wrapf(err, "unable to read file %q", f) - } - v.executorFileCache[f] = btes - } - - varsFromInput, err := getUserExecutorInputYML(ctx, btes) + content, err := os.ReadFile(f) if err != nil { - return err + return errors.Wrapf(err, "unable to read file %q", f) } - // varsFromInput contains the default vars from the executor - var varsFromInputMap map[string]string - if len(varsFromInput) > 0 { - varsFromInputMap, err = DumpStringPreserveCase(varsFromInput) - if err != nil { - return errors.Wrapf(err, "unable to parse variables") - } + ex := readPartialYML(content, "executor") + if len(ex) == 0 { + return errors.Errorf("missing key 'executor' in %q", f) } - varsComputed := map[string]string{} - for k, v := range vars { - varsComputed[k] = v - } - for k, v := range varsFromInputMap { - // we only take vars from varsFromInputMap if it's not already exist in vars from teststep vars - if _, ok := vars[k]; !ok { - varsComputed[k] = v - } - } + name := strings.Replace(ex, "executor:", "", 1) + name = strings.TrimSpace(name) - content, err := interpolate.Do(string(btes), varsComputed) - if err != nil { - return err - } + inputs := readPartialYML(content, "input") - ux := UserExecutor{Filename: f} - if err := yaml.Unmarshal([]byte(content), &ux); err != nil { - return errors.Wrapf(err, "unable to parse file %q with content %v", f, content) + ux := UserExecutor{ + Filename: f, + Executor: name, + RawInputs: []byte(inputs), + Raw: content, } - for k, vr := range varsComputed { - ux.Input.Add(k, vr) + err = v.RegisterExecutorUser(ux.Executor, ux) + if err != nil { + return errors.Wrapf(err, "unable to register user executor %q from file %q", ux.Executor, f) } - - v.RegisterExecutorUser(ux.Executor, ux) + Info(ctx, "User executor %q registered", ux.Executor) } + return nil }