Skip to content

Commit 187fd07

Browse files
committed
add functionlity to hot reload yaml configs
1 parent 42bee96 commit 187fd07

File tree

2 files changed

+295
-0
lines changed

2 files changed

+295
-0
lines changed

pkg/provisioning/service.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,133 @@ func NewService(
6767
}
6868
}
6969

70+
// ReloadYaml parses a YAML input (either a file path or a YAML string) and reloads a pipeline configuration.
71+
// This can be used to reload a pipeline configuration without restarting Conduit.
72+
func (s *Service) ReloadYaml(ctx context.Context, yamlInput string) (string, error) {
73+
// First, determine if the input is a file path or a YAML string
74+
isFilePath := false
75+
if !strings.Contains(yamlInput, "\n") && len(yamlInput) < 1024 {
76+
if _, err := os.Stat(yamlInput); err == nil {
77+
isFilePath = true
78+
}
79+
}
80+
81+
var configs []config.Pipeline
82+
var err error
83+
var source string
84+
85+
if isFilePath {
86+
s.logger.Debug(ctx).
87+
Str("file_path", yamlInput).
88+
Msg("reloading pipeline configuration from file")
89+
90+
configs, err = s.parsePipelineConfigFile(ctx, yamlInput)
91+
if err != nil {
92+
return "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err)
93+
}
94+
source = yamlInput
95+
} else {
96+
s.logger.Debug(ctx).
97+
Msg("reloading pipeline configuration from YAML string")
98+
99+
reader := strings.NewReader(yamlInput)
100+
configs, err = s.parser.Parse(ctx, reader)
101+
if err != nil {
102+
return "", cerrors.Errorf("failed to parse YAML string: %w", err)
103+
}
104+
source = "<yaml-string>"
105+
}
106+
107+
if len(configs) == 0 {
108+
return "", cerrors.New("no pipelines found in the YAML input")
109+
}
110+
111+
pipelineID := configs[0].ID
112+
s.logger.Debug(ctx).
113+
Str("pipeline_id", pipelineID).
114+
Msg("using first pipeline from YAML")
115+
116+
return s.reloadPipeline(ctx, configs, pipelineID, source)
117+
}
118+
119+
// reloadPipeline is a helper function that handles the common logic for reloading a pipeline
120+
// from either a file or a YAML string.
121+
func (s *Service) reloadPipeline(ctx context.Context, configs []config.Pipeline, pipelineID string, source string) (string, error) {
122+
// Find the pipeline with the specified ID
123+
var targetConfig config.Pipeline
124+
found := false
125+
for _, cfg := range configs {
126+
if cfg.ID == pipelineID {
127+
targetConfig = cfg
128+
found = true
129+
break
130+
}
131+
}
132+
133+
if !found {
134+
return "", cerrors.Errorf("pipeline with ID %q not found in source %q", pipelineID, source)
135+
}
136+
137+
// Check if pipeline already exists
138+
pipelineInstance, err := s.pipelineService.Get(ctx, pipelineID)
139+
if err != nil {
140+
if cerrors.Is(err, pipeline.ErrInstanceNotFound) {
141+
// Pipeline doesn't exist, create it
142+
s.logger.Info(ctx).
143+
Str("pipeline_id", pipelineID).
144+
Msg("pipeline doesn't exist, creating new pipeline")
145+
} else {
146+
return "", cerrors.Errorf("error getting pipeline instance with ID %q: %w", pipelineID, err)
147+
}
148+
} else {
149+
// Pipeline exists, check if it was provisioned by config
150+
if pipelineInstance.ProvisionedBy != pipeline.ProvisionTypeConfig {
151+
return "", cerrors.Errorf("pipeline with ID %q was not provisioned by config: %w", pipelineID, ErrNotProvisionedByConfig)
152+
}
153+
154+
// Check if pipeline is running and stop it if needed
155+
pipelineWasRunning := pipelineInstance.GetStatus() == pipeline.StatusRunning
156+
if pipelineWasRunning {
157+
s.logger.Debug(ctx).
158+
Str("pipeline_id", pipelineID).
159+
Msg("stopping pipeline before updating configuration")
160+
161+
err = s.lifecycleService.Stop(ctx, pipelineID, false)
162+
if err != nil {
163+
return "", cerrors.Errorf("could not stop pipeline %q before updating: %w", pipelineID, err)
164+
}
165+
}
166+
167+
// Delete the existing pipeline
168+
s.logger.Debug(ctx).
169+
Str("pipeline_id", pipelineID).
170+
Msg("deleting existing pipeline before recreating")
171+
172+
// Use s.Delete to properly clean up the pipeline
173+
err = s.Delete(ctx, pipelineID)
174+
if err != nil {
175+
return "", cerrors.Errorf("could not delete existing pipeline %q: %w", pipelineID, err)
176+
}
177+
}
178+
179+
// Provision the pipeline with the new configuration
180+
s.logger.Debug(ctx).
181+
Str("pipeline_id", pipelineID).
182+
Msg("provisioning pipeline with updated configuration")
183+
184+
err = s.provisionPipeline(ctx, targetConfig)
185+
if err != nil {
186+
return "", cerrors.Errorf("pipeline %q, error while provisioning: %w", pipelineID, err)
187+
}
188+
189+
s.logger.Info(ctx).
190+
Str("pipeline_id", pipelineID).
191+
Str("source", source).
192+
Msg("pipeline configuration reloaded successfully")
193+
194+
return pipelineID, nil
195+
}
196+
70197
// Init provision pipelines defined in pipelinePath directory. should initialize pipeline service
71198
// before calling this function, and all pipelines should be stopped.
72199
func (s *Service) Init(ctx context.Context) error {

pkg/provisioning/service_test.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package provisioning
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"os"
2021
"testing"
2122
"time"
@@ -479,6 +480,173 @@ func TestService_Delete(t *testing.T) {
479480
is.NoErr(err)
480481
}
481482

483+
func TestService_ReloadYaml(t *testing.T) {
484+
is := is.New(t)
485+
ctx, cancel := context.WithCancel(context.Background())
486+
defer cancel()
487+
488+
// Add cleanup for output files that will be created during the test
489+
t.Cleanup(func() {
490+
for _, path := range []string{"./output-v1.txt", "./output-v2.txt", "./output-v3.txt", "./input.txt"} {
491+
_ = os.Remove(path)
492+
}
493+
})
494+
495+
// Set up a real service with non-mock components for actual runtime verification
496+
logger := log.InitLogger(zerolog.InfoLevel, log.FormatCLI)
497+
logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{})
498+
499+
// Create a temporary database
500+
db, err := badger.New(logger.Logger, t.TempDir()+"/test.db")
501+
is.NoErr(err)
502+
t.Cleanup(func() {
503+
err := db.Close()
504+
is.NoErr(err)
505+
})
506+
507+
// Set up the necessary services
508+
tokenService := connutils.NewAuthManager()
509+
schemaRegistry, err := schemaregistry.NewSchemaRegistry(db)
510+
is.NoErr(err)
511+
connSchemaService := connutils.NewSchemaService(logger, schemaRegistry, tokenService)
512+
513+
// Set up plugin services
514+
connPluginService := conn_plugin.NewPluginService(
515+
logger,
516+
builtin.NewRegistry(logger, builtin.DefaultBuiltinConnectors, connSchemaService),
517+
standalone.NewRegistry(logger, ""),
518+
tokenService,
519+
)
520+
connPluginService.Init(ctx, "conn-utils-token:12345")
521+
522+
procPluginService := proc_plugin.NewPluginService(
523+
logger,
524+
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry),
525+
nil,
526+
)
527+
528+
// Set up core services
529+
plService := pipeline.NewService(logger, db)
530+
connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3))
531+
procService := processor.NewService(logger, db, procPluginService)
532+
533+
// Set up lifecycle service
534+
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
535+
MinDelay: time.Second,
536+
MaxDelay: 10 * time.Minute,
537+
BackoffFactor: 2,
538+
MaxRetries: 0,
539+
MaxRetriesWindow: 5 * time.Minute,
540+
}
541+
lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService)
542+
543+
// Create the provisioning service
544+
service := NewService(db, logger, plService, connService, procService, connPluginService, lifecycleService, "")
545+
546+
// Define the base YAML template
547+
yamlTemplate := `version: 2.0
548+
pipelines:
549+
- id: test-pipeline
550+
status: running
551+
name: Test Pipeline
552+
description: %s
553+
connectors:
554+
- id: source
555+
type: source
556+
plugin: builtin:file
557+
settings:
558+
path: ./input.txt
559+
- id: destination
560+
type: destination
561+
plugin: builtin:file
562+
settings:
563+
path: %s`
564+
565+
// Define test cases
566+
type testCase struct {
567+
name string
568+
source string
569+
sourceType string
570+
description string
571+
outputPath string
572+
}
573+
574+
testCases := []testCase{
575+
{
576+
name: "Initial configuration",
577+
sourceType: "string",
578+
description: "Initial configuration",
579+
outputPath: "./output-v1.txt",
580+
},
581+
{
582+
name: "Update via YAML string",
583+
sourceType: "string",
584+
description: "Updated configuration via string",
585+
outputPath: "./output-v2.txt",
586+
},
587+
{
588+
name: "Update via YAML file",
589+
sourceType: "file",
590+
description: "Updated configuration via file",
591+
outputPath: "./output-v3.txt",
592+
},
593+
}
594+
595+
// Create a temporary file for the file-based test case
596+
tmpfile, err := os.CreateTemp("", "test-pipeline-*.yaml")
597+
is.NoErr(err)
598+
defer os.Remove(tmpfile.Name())
599+
600+
// Run each test case
601+
for i, tc := range testCases {
602+
// Add a small delay between test cases to ensure the pipeline has time to stabilize
603+
if i > 0 {
604+
time.Sleep(1 * time.Second)
605+
}
606+
t.Logf("Running test case %d: %s", i+1, tc.name)
607+
608+
// Generate the YAML content
609+
yamlContent := fmt.Sprintf(yamlTemplate, tc.description, tc.outputPath)
610+
611+
// Set up the source based on the source type
612+
var source string
613+
if tc.sourceType == "file" {
614+
// Write the YAML content to the file
615+
err = os.WriteFile(tmpfile.Name(), []byte(yamlContent), 0o644)
616+
is.NoErr(err)
617+
source = tmpfile.Name()
618+
} else {
619+
source = yamlContent
620+
}
621+
622+
// Reload the pipeline with the current configuration
623+
pipelineID, err := service.ReloadYaml(ctx, source)
624+
is.NoErr(err)
625+
is.Equal(pipelineID, "test-pipeline")
626+
627+
// Give the pipeline time to update
628+
time.Sleep(500 * time.Millisecond)
629+
630+
// Verify the configuration was applied correctly
631+
pipeline, err := plService.Get(ctx, "test-pipeline")
632+
is.NoErr(err)
633+
is.Equal(pipeline.Config.Name, "Test Pipeline")
634+
is.Equal(pipeline.Config.Description, tc.description)
635+
636+
// Get the destination connector and verify its path setting
637+
destConnID := ""
638+
for _, connID := range pipeline.ConnectorIDs {
639+
conn, err := connService.Get(ctx, connID)
640+
is.NoErr(err)
641+
if conn.ID == "test-pipeline:destination" {
642+
destConnID = conn.ID
643+
is.Equal(conn.Config.Settings["path"], tc.outputPath)
644+
}
645+
}
646+
is.True(destConnID != "")
647+
}
648+
}
649+
482650
func TestService_IntegrationTestServices(t *testing.T) {
483651
is := is.New(t)
484652
ctx, killAll := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)