Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(venom): user executors performance issues #831

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dump.go
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ func WithFormatterLowerFirstKey() dump.KeyFormatterFunc {
return func(s string, level int) string {
if level == 0 && strings.Contains(s, ".") {
pos := strings.Index(s, ".")
return strings.ToLower(s[0:pos])+s[pos:]
return strings.ToLower(s[0:pos]) + s[pos:]
}
if level == 0 {
return strings.ToLower(f(s, level))
55 changes: 31 additions & 24 deletions executors/exec/exec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package exec

import (
"bufio"
"context"
"fmt"
"os"
@@ -161,42 +160,50 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
return nil, fmt.Errorf("runScriptAction: Cannot get stderr pipe: %s", err)
}

stdoutreader := bufio.NewReader(stdout)
stderrreader := bufio.NewReader(stderr)

result := Result{}
outchan := make(chan bool)

go func() {
var sb strings.Builder
sb.Grow(1024 * 1024) // Pre-allocate 1MB

// For efficiency, read in larger chunks
buf := make([]byte, 64*1024) // 64KB buffer
for {
line, errs := stdoutreader.ReadString('\n')
if errs != nil {
// ReadString returns what has been read even though an error was encountered
// ie. capture outputs with no '\n' at the end
result.Systemout += line
stdout.Close()
close(outchan)
return
n, err := stdout.Read(buf)
if n > 0 {
sb.Write(buf[:n])
}
if err != nil {
break
}
result.Systemout += line
venom.Debug(ctx, venom.HideSensitive(ctx, line))
}

result.Systemout = sb.String()
close(outchan)
}()

errchan := make(chan bool)
go func() {
var sb strings.Builder
sb.Grow(64 * 1024) // Pre-allocate 64KB for stderr

buf := make([]byte, 8*1024) // 8KB buffer for stderr
for {
line, errs := stderrreader.ReadString('\n')
if errs != nil {
// ReadString returns what has been read even though an error was encountered
// ie. capture outputs with no '\n' at the end
result.Systemerr += line
stderr.Close()
close(errchan)
return
n, err := stderr.Read(buf)
if n > 0 {
chunk := buf[:n]
sb.Write(chunk)
venom.Debug(ctx, venom.HideSensitive(ctx, string(chunk)))
}
if err != nil {
break
}
result.Systemerr += line
venom.Debug(ctx, venom.HideSensitive(ctx, line))
}

result.Systemerr = sb.String()
stderr.Close()
close(errchan)
}()

if err := cmd.Start(); err != nil {
2 changes: 1 addition & 1 deletion executors/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -265,7 +265,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
// invoke the gRPC
err = grpcurl.InvokeRPC(ctx, descSource, cc, e.Service+"/"+e.Method, headers, &handle, rf.Next)
if err != nil {
return nil, err
return nil, fmt.Errorf("grpcurl.InvokeRPC() failed.\nUrl: %q\nService: %q\nMethod: %q\nData:%v: %v", e.URL, e.Service, e.Method, e.Data, err)
}

elapsed := time.Since(start)
24 changes: 16 additions & 8 deletions log.go
Original file line number Diff line number Diff line change
@@ -44,17 +44,25 @@ func asJsonString(i interface{}) string {
// HideSensitive replace the value with __hidden__
func HideSensitive(ctx context.Context, arg interface{}) string {
s := ctx.Value(ContextKey("secrets"))

// Fast path: if no secrets to hide, avoid unnecessary string conversion
if s == nil {
if str, ok := arg.(string); ok {
return str
}
return fmt.Sprint(arg)
}
cleanVars := fmt.Sprint(arg)
if s != nil {
switch reflect.TypeOf(s).Kind() {
case reflect.Slice:
secrets := reflect.ValueOf(s)
for i := 0; i < secrets.Len(); i++ {
secret := fmt.Sprint(secrets.Index(i).Interface())
cleanVars = strings.ReplaceAll(cleanVars, secret, "__hidden__")
}

switch reflect.TypeOf(s).Kind() {
case reflect.Slice:
secrets := reflect.ValueOf(s)
for i := 0; i < secrets.Len(); i++ {
secret := fmt.Sprint(secrets.Index(i).Interface())
cleanVars = strings.ReplaceAll(cleanVars, secret, "__hidden__")
}
}

return cleanVars
}

5 changes: 5 additions & 0 deletions process.go
Original file line number Diff line number Diff line change
@@ -89,6 +89,11 @@ func (v *Venom) Parse(ctx context.Context, path []string) error {
return err
}

err = v.registerUserExecutors(ctx)
if err != nil {
return errors.Wrapf(err, "unable to register user executors")
}

missingVars := []string{}
extractedVars := []string{}
for i := range v.Tests.TestSuites {
21 changes: 3 additions & 18 deletions read_partial.go
Original file line number Diff line number Diff line change
@@ -11,24 +11,6 @@ import (
"github.com/rockbears/yaml"
)

func getUserExecutorInputYML(ctx context.Context, btesIn []byte) (H, error) {
btes := readPartialYML(btesIn, "input")

var result = map[string]interface{}{}
var tmpResult = map[string]interface{}{}

if len(btes) > 0 {
if err := yaml.Unmarshal([]byte(btes), &tmpResult); err != nil {
return nil, err
}
}
for k, v := range tmpResult {
result[k] = v
}

return result, nil
}

func getVarFromPartialYML(ctx context.Context, btesIn []byte) (H, error) {
btes := readPartialYML(btesIn, "vars")
type partialVars struct {
@@ -52,6 +34,9 @@ func readPartialYML(btes []byte, attribute string) string {
var record bool
for scanner.Scan() {
line := scanner.Text()
line = strings.TrimFunc(line, func(r rune) bool {
return !unicode.IsGraphic(r)
})
if strings.HasPrefix(line, attribute+":") {
record = true
} else if len(line) > 0 {
5 changes: 2 additions & 3 deletions types.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
"maps"
"strings"
"time"
"unicode"
@@ -25,9 +26,7 @@ const (
type H map[string]interface{}

func (h H) Clone() H {
var h2 = make(H, len(h))
h2.AddAll(h)
return h2
return maps.Clone(h)
}

func (h *H) Add(k string, v interface{}) {
95 changes: 71 additions & 24 deletions types_executor.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"regexp"
"strings"

"github.com/gosimple/slug"
@@ -168,11 +169,13 @@ func GetExecutorResult(r interface{}) map[string]interface{} {
}

type UserExecutor struct {
Executor string `json:"executor" yaml:"executor"`
Input H `json:"input" yaml:"input"`
RawTestSteps []json.RawMessage `json:"steps" yaml:"steps"`
Output json.RawMessage `json:"output" yaml:"output"`
Filename string `json:"-" yaml:"-"`
Executor string
Input H `json:"input" yaml:"input"`
TestSteps []json.RawMessage `json:"steps" yaml:"steps"`
Raw []byte `json:"-" yaml:"-"` // the raw file content of the executor
RawInputs []byte `json:"-" yaml:"-"`
Filename string `json:"-" yaml:"-"`
Output json.RawMessage `json:"output" yaml:"output"`
}

// Run is not implemented on user executor
@@ -202,34 +205,66 @@ func (ux UserExecutor) ZeroValueResult() interface{} {

func (v *Venom) RunUserExecutor(ctx context.Context, runner ExecutorRunner, tcIn *TestCase, tsIn *TestStepResult, step TestStep) (interface{}, error) {
vrs := tcIn.TestSuiteVars.Clone()
uxIn := runner.GetExecutor().(UserExecutor)

for k, va := range uxIn.Input {
if strings.HasPrefix(k, "input.") {
// do not reinject input.vars from parent user executor if exists
continue
} else if !strings.HasPrefix(k, "venom") {
if vl, ok := step[k]; ok && vl != "" { // value from step
vrs.AddWithPrefix("input", k, vl)
} else { // default value from executor
vrs.AddWithPrefix("input", k, va)
ux := runner.GetExecutor().(UserExecutor)
var tsVars map[string]string
newUX := UserExecutor{}
var err error

// process inputs
if len(ux.RawInputs) != 0 {
tsVars, err = DumpString(vrs)
if err != nil {
return nil, errors.Wrapf(err, "error processing executor inputs: unable to dump testsuite vars")
}

interpolatedInput, err := interpolate.Do(string(ux.RawInputs), tsVars)
if err != nil {
return nil, errors.Wrapf(err, "unable to interpolate executor inputs %q", ux.Executor)
}

err = yaml.Unmarshal([]byte(interpolatedInput), &newUX)
if err != nil {
return nil, errors.Wrapf(err, "unable to unmarshal inputs for executor %q - raw interpolated:\n%v", ux.Executor, string(interpolatedInput))
}

for k, va := range newUX.Input {
if strings.HasPrefix(k, "input.") {
// do not reinject input.vars from parent user executor if exists
continue
} else if !strings.HasPrefix(k, "venom") {
if vl, ok := step[k]; ok && vl != "" { // value from step
vrs.AddWithPrefix("input", k, vl)
} else { // default value from executor
vrs.AddWithPrefix("input", k, va)
}
} else {
vrs.Add(k, va)
}
} else {
vrs.Add(k, va)
}
tsVars, err = DumpString(vrs)
if err != nil {
return nil, errors.Wrapf(err, "error processing executor inputs: unable to dump testsuite vars")
}
}
// reload the user executor with the interpolated vars
_, exe, err := v.GetExecutorRunner(ctx, step, vrs)

interpolatedFull, err := interpolate.Do(string(ux.Raw), tsVars)
if err != nil {
return nil, errors.Wrapf(err, "unable to interpolate executor %q", ux.Executor)
}
// quote any remaining template expressions to ensure proper YAML parsing
sanitized := quoteTemplateExpressions([]byte(interpolatedFull))

err = yaml.Unmarshal([]byte(sanitized), &newUX)
if err != nil {
return nil, errors.Wrapf(err, "unable to reload executor")
return nil, errors.Wrapf(err, "unable to unmarshal executor %q - raw interpolated :\n%v", ux.Executor, string(sanitized))
}
ux := exe.GetExecutor().(UserExecutor)
ux.Output = newUX.Output

tc := &TestCase{
TestCaseInput: TestCaseInput{
Name: ux.Executor,
RawTestSteps: ux.RawTestSteps,
Vars: vrs,
RawTestSteps: newUX.TestSteps,
},
number: tcIn.number,
TestSuiteVars: tcIn.TestSuiteVars,
@@ -283,7 +318,7 @@ func (v *Venom) RunUserExecutor(ctx context.Context, runner ExecutorRunner, tcIn
}

if len(tsIn.Errors) > 0 {
return outputResult, fmt.Errorf("failed")
return outputResult, fmt.Errorf("executor %q failed - raw interpolated:\n%v", ux.Executor, string(sanitized))
}

// here, we have the user executor results.
@@ -332,3 +367,15 @@ func (v *Venom) RunUserExecutor(ctx context.Context, runner ExecutorRunner, tcIn
}
return result, nil
}

// quoteTemplateExpressions adds double quotes around template expressions in YAML content.
// It specifically targets expressions that follow a colon and whitespace like 'key: {{.variable}}'
// and are not already enclosed in quotes. This ensures proper YAML parsing of template variables.
func quoteTemplateExpressions(content []byte) []byte {
// First capture group matches everything up to the colon, checking the last non-whitespace
// character isn't a quote (to skip JSON keys)
re := regexp.MustCompile(`(?m)(^.*[^"\s][\s]*)(:\s+)({{.*?}})(.*?)(?:\s*)$`)

// Put quotes around the template expression and what follows it
return re.ReplaceAll(content, []byte(`$1$2"$3$4"`))
}
141 changes: 65 additions & 76 deletions venom.go
Original file line number Diff line number Diff line change
@@ -10,14 +10,13 @@ import (
"path"
"path/filepath"
"plugin"
"reflect"
"sort"
"strings"

"github.com/confluentinc/bincover"
"github.com/fatih/color"
"github.com/ovh/cds/sdk/interpolate"
"github.com/pkg/errors"
"github.com/rockbears/yaml"
"github.com/spf13/cast"
)

@@ -42,27 +41,25 @@ type ContextKey string
// New instantiates a new venom on venom run cmd
func New() *Venom {
v := &Venom{
LogOutput: os.Stdout,
PrintFunc: fmt.Printf,
executorsBuiltin: map[string]Executor{},
executorsPlugin: map[string]Executor{},
executorsUser: map[string]Executor{},
executorFileCache: map[string][]byte{},
variables: map[string]interface{}{},
secrets: map[string]interface{}{},
OutputFormat: "xml",
LogOutput: os.Stdout,
PrintFunc: fmt.Printf,
executorsBuiltin: map[string]Executor{},
executorsPlugin: map[string]Executor{},
executorsUser: map[string]Executor{},
variables: map[string]interface{}{},
secrets: map[string]interface{}{},
OutputFormat: "xml",
}
return v
}

type Venom struct {
LogOutput io.Writer

PrintFunc func(format string, a ...interface{}) (n int, err error)
executorsBuiltin map[string]Executor
executorsPlugin map[string]Executor
executorsUser map[string]Executor
executorFileCache map[string][]byte
PrintFunc func(format string, a ...interface{}) (n int, err error)
executorsBuiltin map[string]Executor
executorsPlugin map[string]Executor
executorsUser map[string]Executor

Tests Tests
variables H
@@ -116,13 +113,18 @@ func (v *Venom) RegisterExecutorPlugin(name string, e Executor) {
v.executorsPlugin[name] = e
}

// RegisterExecutorUser register User sxecutors
func (v *Venom) RegisterExecutorUser(name string, e Executor) {
// RegisterExecutorUser registers an user executor
func (v *Venom) RegisterExecutorUser(name string, e Executor) error {
if existing, ok := v.executorsUser[name]; ok {
return fmt.Errorf("executor %q already exists (from file %q)", name, existing.(UserExecutor).Filename)
}

v.executorsUser[name] = e
return nil
}

// GetExecutorRunner initializes a test by name
// no type -> exec is default
// GetExecutorRunner initializes a test according to its type
// if no type is provided, exec is default
func (v *Venom) GetExecutorRunner(ctx context.Context, ts TestStep, h H) (context.Context, ExecutorRunner, error) {
name, _ := ts.StringValue("type")
script, _ := ts.StringValue("script")
@@ -168,10 +170,6 @@ func (v *Venom) GetExecutorRunner(ctx context.Context, ts TestStep, h H) (contex
return ctx, newExecutorRunner(ex, name, "builtin", retry, retryIf, delay, timeout, info), nil
}

if err := v.registerUserExecutors(ctx, name, vars); err != nil {
Debug(ctx, "executor %q is not implemented as user executor - err:%v", name, err)
}

if ex, ok := v.executorsUser[name]; ok {
return ctx, newExecutorRunner(ex, name, "user", retry, retryIf, delay, timeout, info), nil
}
@@ -181,100 +179,91 @@ func (v *Venom) GetExecutorRunner(ctx context.Context, ts TestStep, h H) (contex
}

// then add the executor plugin to the map to not have to load it on each step
if ex, ok := v.executorsUser[name]; ok {
if ex, ok := v.executorsPlugin[name]; ok {
return ctx, newExecutorRunner(ex, name, "plugin", retry, retryIf, delay, timeout, info), nil
}
return ctx, nil, fmt.Errorf("executor %q is not implemented", name)
return ctx, nil, fmt.Errorf("user executor %q not found - loaded executors are: %v", name, reflect.ValueOf(v.executorsUser).MapKeys())
}

func (v *Venom) getUserExecutorFilesPath(vars map[string]string) (filePaths []string, err error) {
func (v *Venom) getUserExecutorFilesPath(ctx context.Context, vars map[string]string) []string {
var libpaths []string
if v.LibDir != "" {
p := strings.Split(v.LibDir, string(os.PathListSeparator))
libpaths = append(libpaths, p...)
}
libpaths = append(libpaths, path.Join(vars["venom.testsuite.workdir"], "lib"))

//use a map to avoid duplicates
filepaths := make(map[string]bool)

for _, p := range libpaths {
p = strings.TrimSpace(p)

err = filepath.Walk(p, func(fp string, f os.FileInfo, err error) error {
err := filepath.Walk(p, func(fp string, f os.FileInfo, err error) error {
switch ext := filepath.Ext(fp); ext {
case ".yml", ".yaml":
filePaths = append(filePaths, fp)
filepaths[fp] = true
}
return nil
})
if err != nil {
return nil, err
Warn(ctx, "Unable to list files in lib directory %q: %v", p, err)
}
}

sort.Strings(filePaths)
if len(filePaths) == 0 {
return nil, fmt.Errorf("no user executor yml file selected")
userExecutorFiles := make([]string, len(filepaths))
i := 0
for p := range filepaths {
userExecutorFiles[i] = p
i++
}

sort.Strings(userExecutorFiles)
if len(userExecutorFiles) == 0 {
Warn(ctx, "no user executor yml file selected")
}
return filePaths, nil
return userExecutorFiles
}

func (v *Venom) registerUserExecutors(ctx context.Context, name string, vars map[string]string) error {
executorsPath, err := v.getUserExecutorFilesPath(vars)
func (v *Venom) registerUserExecutors(ctx context.Context) error {
vars, err := DumpStringPreserveCase(v.variables)
if err != nil {
return err
return errors.Wrapf(err, "unable to parse variables")
}

executorsPath := v.getUserExecutorFilesPath(ctx, vars)

for _, f := range executorsPath {
Info(ctx, "Reading %v", f)
btes, ok := v.executorFileCache[f]
if !ok {
btes, err = os.ReadFile(f)
if err != nil {
return errors.Wrapf(err, "unable to read file %q", f)
}
v.executorFileCache[f] = btes
}

varsFromInput, err := getUserExecutorInputYML(ctx, btes)
content, err := os.ReadFile(f)
if err != nil {
return err
return errors.Wrapf(err, "unable to read file %q", f)
}

// varsFromInput contains the default vars from the executor
var varsFromInputMap map[string]string
if len(varsFromInput) > 0 {
varsFromInputMap, err = DumpStringPreserveCase(varsFromInput)
if err != nil {
return errors.Wrapf(err, "unable to parse variables")
}
ex := readPartialYML(content, "executor")
if len(ex) == 0 {
return errors.Errorf("missing key 'executor' in %q", f)
}

varsComputed := map[string]string{}
for k, v := range vars {
varsComputed[k] = v
}
for k, v := range varsFromInputMap {
// we only take vars from varsFromInputMap if it's not already exist in vars from teststep vars
if _, ok := vars[k]; !ok {
varsComputed[k] = v
}
}
name := strings.Replace(ex, "executor:", "", 1)
name = strings.TrimSpace(name)

content, err := interpolate.Do(string(btes), varsComputed)
if err != nil {
return err
}
inputs := readPartialYML(content, "input")

ux := UserExecutor{Filename: f}
if err := yaml.Unmarshal([]byte(content), &ux); err != nil {
return errors.Wrapf(err, "unable to parse file %q with content %v", f, content)
ux := UserExecutor{
Filename: f,
Executor: name,
RawInputs: []byte(inputs),
Raw: content,
}

for k, vr := range varsComputed {
ux.Input.Add(k, vr)
err = v.RegisterExecutorUser(ux.Executor, ux)
if err != nil {
return errors.Wrapf(err, "unable to register user executor %q from file %q", ux.Executor, f)
}

v.RegisterExecutorUser(ux.Executor, ux)
Info(ctx, "User executor %q registered", ux.Executor)
}

return nil
}