Skip to content

Allow packages without service deployer to have system tests #1768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 58 additions & 37 deletions internal/agentdeployer/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,11 @@ type FactoryOptions struct {
// Factory chooses the appropriate service runner for the given data stream, depending
// on service configuration files defined in the package or data stream.
func Factory(options FactoryOptions) (AgentDeployer, error) {
devDeployPath, err := FindDevDeployPath(options)
if err != nil {
return nil, fmt.Errorf("can't find \"%s\" directory: %w", options.DevDeployDir, err)
}

agentDeployerName, err := findAgentDeployer(devDeployPath)
agentDeployerName, agentDeployerPath, err := selectAgentDeployerType(options)
if err != nil {
logger.Debugf("Not found any agent deployer, using default one")
agentDeployerName = "default"
}
// if package defines `_dev/deploy/docker` folder to start their services, it should be
// using the default agent deployer`
if agentDeployerName == "docker" || agentDeployerName == "tf" {
agentDeployerName = "default"
return nil, fmt.Errorf("failed to select agent deployer type: %w", err)
}

agentDeployerPath := filepath.Join(devDeployPath, agentDeployerName)

switch agentDeployerName {
case "default":
if options.Type != TypeTest {
Expand All @@ -80,46 +67,79 @@ func Factory(options FactoryOptions) (AgentDeployer, error) {
// FIXME: this docker-compose scenario contains both agent and service
return nil, nil
case "k8s":
if _, err := os.Stat(agentDeployerPath); err == nil {
opts := KubernetesAgentDeployerOptions{
Profile: options.Profile,
DefinitionsDir: agentDeployerPath,
StackVersion: options.StackVersion,
PolicyName: options.PolicyName,
DataStream: options.DataStream,
RunSetup: options.RunSetup,
RunTestsOnly: options.RunTestsOnly,
RunTearDown: options.RunTearDown,
}
return NewKubernetesAgentDeployer(opts)
opts := KubernetesAgentDeployerOptions{
Profile: options.Profile,
DefinitionsDir: agentDeployerPath,
StackVersion: options.StackVersion,
PolicyName: options.PolicyName,
DataStream: options.DataStream,
RunSetup: options.RunSetup,
RunTestsOnly: options.RunTestsOnly,
RunTearDown: options.RunTearDown,
}
return NewKubernetesAgentDeployer(opts)
}
return nil, fmt.Errorf("unsupported agent deployer (name: %s)", agentDeployerName)
}

func selectAgentDeployerType(options FactoryOptions) (string, string, error) {
devDeployPath, err := FindDevDeployPath(options)
if errors.Is(err, os.ErrNotExist) {
return "default", "", nil
}
if err != nil {
return "", "", fmt.Errorf("can't find \"%s\" directory: %w", options.DevDeployDir, err)
}

agentDeployerNames, err := findAgentDeployers(devDeployPath)
if errors.Is(err, os.ErrNotExist) || len(agentDeployerNames) == 0 {
logger.Debugf("Not agent deployer found, using default one")
return "default", "", nil
}
if err != nil {
return "", "", fmt.Errorf("failed to find agent deployer: %w", err)
}
if len(agentDeployerNames) != 1 {
return "", "", fmt.Errorf("expected to find only one agent deployer in \"%s\"", devDeployPath)
}
agentDeployerName := agentDeployerNames[0]

// if package defines `_dev/deploy/docker` folder to start their services, it should be
// using the default agent deployer`
if agentDeployerName == "docker" || agentDeployerName == "tf" {
return "default", "", nil
}

// No need to check if this path exists because it comes from a directory list.
agentDeployerPath := filepath.Join(devDeployPath, agentDeployerName)
return agentDeployerName, agentDeployerPath, nil
}

// FindDevDeployPath function returns a path reference to the "_dev/deploy" directory.
func FindDevDeployPath(options FactoryOptions) (string, error) {
dataStreamDevDeployPath := filepath.Join(options.DataStreamRootPath, options.DevDeployDir)
if _, err := os.Stat(dataStreamDevDeployPath); err == nil {
info, err := os.Stat(dataStreamDevDeployPath)
if err == nil && info.IsDir() {
return dataStreamDevDeployPath, nil
} else if !errors.Is(err, os.ErrNotExist) {
} else if err != nil && !errors.Is(err, os.ErrNotExist) {
return "", fmt.Errorf("stat failed for data stream (path: %s): %w", dataStreamDevDeployPath, err)
}

packageDevDeployPath := filepath.Join(options.PackageRootPath, options.DevDeployDir)
if _, err := os.Stat(packageDevDeployPath); err == nil {
info, err = os.Stat(packageDevDeployPath)
if err == nil && info.IsDir() {
return packageDevDeployPath, nil
} else if !errors.Is(err, os.ErrNotExist) {
} else if err != nil && !errors.Is(err, os.ErrNotExist) {
return "", fmt.Errorf("stat failed for package (path: %s): %w", packageDevDeployPath, err)
}

return "", fmt.Errorf("\"%s\" directory doesn't exist", options.DevDeployDir)
return "", fmt.Errorf("\"%s\" %w", options.DevDeployDir, os.ErrNotExist)
}

func findAgentDeployer(devDeployPath string) (string, error) {
func findAgentDeployers(devDeployPath string) ([]string, error) {
fis, err := os.ReadDir(devDeployPath)
if err != nil {
return "", fmt.Errorf("can't read directory (path: %s): %w", devDeployPath, err)
return nil, fmt.Errorf("can't read directory (path: %s): %w", devDeployPath, err)
}

var folders []os.DirEntry
Expand All @@ -129,8 +149,9 @@ func findAgentDeployer(devDeployPath string) (string, error) {
}
}

if len(folders) != 1 {
return "", fmt.Errorf("expected to find only one agent deployer in \"%s\"", devDeployPath)
var names []string
for _, folder := range folders {
names = append(names, folder.Name())
}
return folders[0].Name(), nil
return names, nil
}
87 changes: 49 additions & 38 deletions internal/benchrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,44 +231,13 @@ func (r *runner) setUp(ctx context.Context) error {
func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) {
var service servicedeployer.DeployedService
if r.scenario.Corpora.InputService != nil {
stackVersion, err := r.options.KibanaClient.Version()
if err != nil {
return nil, fmt.Errorf("cannot request Kibana version: %w", err)
}

// Setup service.
logger.Debug("setting up service...")
devDeployDir := filepath.Clean(filepath.Join(r.options.BenchPath, "deploy"))
opts := servicedeployer.FactoryOptions{
PackageRootPath: r.options.PackageRootPath,
DevDeployDir: devDeployDir,
Variant: r.options.Variant,
Profile: r.options.Profile,
Type: servicedeployer.TypeBench,
StackVersion: stackVersion.Version(),
DeployIndependentAgent: false,
}
serviceDeployer, err := servicedeployer.Factory(opts)

if err != nil {
return nil, fmt.Errorf("could not create service runner: %w", err)
}

r.svcInfo.Name = r.scenario.Corpora.InputService.Name
service, err = serviceDeployer.SetUp(ctx, r.svcInfo)
if err != nil {
return nil, fmt.Errorf("could not setup service: %w", err)
}

r.svcInfo = service.Info()
r.shutdownServiceHandler = func(ctx context.Context) error {
logger.Debug("tearing down service...")
if err := service.TearDown(ctx); err != nil {
return fmt.Errorf("error tearing down service: %w", err)
}

return nil
s, err := r.setupService(ctx)
if errors.Is(err, os.ErrNotExist) {
logger.Debugf("No service deployer defined for this benchmark")
} else if err != nil {
return nil, err
}
service = s
}

r.startMetricsColletion(ctx)
Expand All @@ -288,7 +257,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro
}

// Signal to the service that the agent is ready (policy is assigned).
if r.scenario.Corpora.InputService != nil && r.scenario.Corpora.InputService.Signal != "" {
if service != nil && r.scenario.Corpora.InputService != nil && r.scenario.Corpora.InputService.Signal != "" {
if err = service.Signal(ctx, r.scenario.Corpora.InputService.Signal); err != nil {
return nil, fmt.Errorf("failed to notify benchmark service: %w", err)
}
Expand All @@ -314,6 +283,48 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro
return createReport(r.options.BenchName, r.corporaFile, r.scenario, msum)
}

func (r *runner) setupService(ctx context.Context) (servicedeployer.DeployedService, error) {
stackVersion, err := r.options.KibanaClient.Version()
if err != nil {
return nil, fmt.Errorf("cannot request Kibana version: %w", err)
}

// Setup service.
logger.Debug("Setting up service...")
devDeployDir := filepath.Clean(filepath.Join(r.options.BenchPath, "deploy"))
opts := servicedeployer.FactoryOptions{
PackageRootPath: r.options.PackageRootPath,
DevDeployDir: devDeployDir,
Variant: r.options.Variant,
Profile: r.options.Profile,
Type: servicedeployer.TypeBench,
StackVersion: stackVersion.Version(),
DeployIndependentAgent: false,
}
serviceDeployer, err := servicedeployer.Factory(opts)
if err != nil {
return nil, fmt.Errorf("could not create service runner: %w", err)
}

r.svcInfo.Name = r.scenario.Corpora.InputService.Name
service, err := serviceDeployer.SetUp(ctx, r.svcInfo)
if err != nil {
return nil, fmt.Errorf("could not setup service: %w", err)
}

r.svcInfo = service.Info()
r.shutdownServiceHandler = func(ctx context.Context) error {
logger.Debug("tearing down service...")
if err := service.TearDown(ctx); err != nil {
return fmt.Errorf("error tearing down service: %w", err)
}

return nil
}

return service, nil
}

func (r *runner) startMetricsColletion(ctx context.Context) {
// TODO collect agent hosts metrics using system integration
r.mcollector = newCollector(
Expand Down
6 changes: 6 additions & 0 deletions internal/service/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package service

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -44,6 +45,10 @@ func BootUp(ctx context.Context, options Options) error {
StackVersion: options.StackVersion,
DeployIndependentAgent: false,
})
if errors.Is(err, os.ErrNotExist) {
fmt.Println("No service defined.")
return nil
}
if err != nil {
return fmt.Errorf("can't create the service deployer instance: %w", err)
}
Expand All @@ -67,6 +72,7 @@ func BootUp(ctx context.Context, options Options) error {
fmt.Println("Service is up, please use ctrl+c to take it down")
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(ch)
<-ch

// Tear down the service
Expand Down
2 changes: 1 addition & 1 deletion internal/servicedeployer/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func FindDevDeployPath(options FactoryOptions) (string, error) {
return "", fmt.Errorf("stat failed for package (path: %s): %w", packageDevDeployPath, err)
}

return "", fmt.Errorf("\"%s\" directory doesn't exist", options.DevDeployDir)
return "", fmt.Errorf("\"%s\" %w", options.DevDeployDir, os.ErrNotExist)
}

func findServiceDeployer(devDeployPath string) (string, error) {
Expand Down
34 changes: 19 additions & 15 deletions internal/testrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,20 @@ func (r *runner) initRun() error {
DataStreamRootPath: r.dataStreamPath,
DevDeployDir: DevDeployDir,
})
if err != nil {
return fmt.Errorf("_dev/deploy directory not found: %w", err)
switch {
case errors.Is(err, os.ErrNotExist):
r.variants = r.selectVariants(nil)
case err != nil:
return fmt.Errorf("failed fo find service deploy path: %w", err)
default:
variantsFile, err := servicedeployer.ReadVariantsFile(devDeployPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("can't read service variant: %w", err)
}
r.variants = r.selectVariants(variantsFile)
}
if r.options.ServiceVariant != "" && len(r.variants) == 0 {
return fmt.Errorf("not found variant definition %q", r.options.ServiceVariant)
}

if r.options.ConfigFilePath != "" {
Expand All @@ -486,16 +498,6 @@ func (r *runner) initRun() error {
}
}

variantsFile, err := servicedeployer.ReadVariantsFile(devDeployPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("can't read service variant: %w", err)
}

r.variants = r.selectVariants(variantsFile)
if r.options.ServiceVariant != "" && len(r.variants) == 0 {
return fmt.Errorf("not found variant definition %q", r.options.ServiceVariant)
}

return nil
}

Expand Down Expand Up @@ -804,7 +806,9 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf
scenario.agent = agentDeployed

service, svcInfo, err := r.setupService(ctx, config, serviceOptions, svcInfo, agentInfo, agentDeployed, serviceStateData)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
logger.Debugf("No service deployer defined for this test")
} else if err != nil {
return nil, err
}

Expand Down Expand Up @@ -1000,7 +1004,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf
}

// Signal to the service that the agent is ready (policy is assigned).
if config.ServiceNotifySignal != "" {
if service != nil && config.ServiceNotifySignal != "" {
if err = service.Signal(ctx, config.ServiceNotifySignal); err != nil {
return nil, fmt.Errorf("failed to notify test service: %w", err)
}
Expand Down Expand Up @@ -1044,7 +1048,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf
return hits.size() > 0, nil
}, 1*time.Second, waitForDataTimeout)

if config.Service != "" && !config.IgnoreServiceError {
if service != nil && config.Service != "" && !config.IgnoreServiceError {
exited, code, err := service.ExitCode(ctx, config.Service)
if err != nil && !errors.Is(err, servicedeployer.ErrNotSupported) {
return nil, err
Expand Down
Loading