@@ -67,9 +67,8 @@ func NewService(
6767 }
6868}
6969
70- // ReloadYaml parses a YAML input (either a file path or a YAML string) and reloads a pipeline configuration.
71- // This can be used to reload a pipeline configuration without restarting Conduit.
72- func (s * Service ) ReloadYaml (ctx context.Context , yamlInput string ) (string , error ) {
70+ // parseYamlInput parses a YAML input (either a file path or a YAML string) and returns the first pipeline configuration.
71+ func (s * Service ) parseYamlInput (ctx context.Context , yamlInput string ) (config.Pipeline , string , error ) {
7372 // First, determine if the input is a file path or a YAML string
7473 isFilePath := false
7574 if ! strings .Contains (yamlInput , "\n " ) && len (yamlInput ) < 1024 {
@@ -78,75 +77,108 @@ func (s *Service) ReloadYaml(ctx context.Context, yamlInput string) (string, err
7877 }
7978 }
8079
81- var configs []config.Pipeline
82- var err error
8380 var source string
81+ var pipelineConfig config.Pipeline
8482
83+ // Parse the YAML input
8584 if isFilePath {
8685 s .logger .Debug (ctx ).
8786 Str ("file_path" , yamlInput ).
88- Msg ("reloading pipeline configuration from file" )
87+ Msg ("parsing pipeline configuration from file" )
8988
90- configs , err = s .parsePipelineConfigFile (ctx , yamlInput )
89+ // Parse the file and get the first pipeline configuration
90+ pipelineConfigs , err := s .parsePipelineConfigFile (ctx , yamlInput )
9191 if err != nil {
92- return "" , cerrors .Errorf ("failed to parse file %q: %w" , yamlInput , err )
92+ return config.Pipeline {}, "" , cerrors .Errorf ("failed to parse file %q: %w" , yamlInput , err )
93+ }
94+
95+ if len (pipelineConfigs ) == 0 {
96+ return config.Pipeline {}, "" , cerrors .New ("no pipelines found in the YAML file" )
9397 }
98+
99+ // Use the first pipeline from the file
100+ pipelineConfig = pipelineConfigs [0 ]
94101 source = yamlInput
95102 } else {
96103 s .logger .Debug (ctx ).
97- Msg ("reloading pipeline configuration from YAML string" )
104+ Msg ("parsing pipeline configuration from YAML string" )
98105
99106 reader := strings .NewReader (yamlInput )
100- configs , err = s .parser .Parse (ctx , reader )
107+ pipelineConfigs , err : = s .parser .Parse (ctx , reader )
101108 if err != nil {
102- return "" , cerrors .Errorf ("failed to parse YAML string: %w" , err )
109+ return config.Pipeline {}, "" , cerrors .Errorf ("failed to parse YAML string: %w" , err )
110+ }
111+
112+ if len (pipelineConfigs ) == 0 {
113+ return config.Pipeline {}, "" , cerrors .New ("no pipelines found in the YAML string" )
103114 }
104- source = "<yaml-string>"
105- }
106115
107- if len (configs ) == 0 {
108- return "" , cerrors .New ("no pipelines found in the YAML input" )
116+ // Use the first pipeline from the string
117+ pipelineConfig = pipelineConfigs [0 ]
118+ source = "<yaml-string>"
109119 }
110120
111- pipelineID := configs [0 ].ID
121+ // Extract the pipeline ID
122+ pipelineID := pipelineConfig .ID
112123 s .logger .Debug (ctx ).
113124 Str ("pipeline_id" , pipelineID ).
114- Msg ("using first pipeline from YAML" )
125+ Msg ("using pipeline from YAML" )
115126
116- return s . reloadPipeline ( ctx , configs , pipelineID , source )
127+ return pipelineConfig , source , nil
117128}
118129
119- // reloadPipeline is a helper function that handles the common logic for reloading a pipeline
120- // from either a file or a YAML string.
121- func (s * Service ) reloadPipeline (ctx context.Context , configs []config.Pipeline , pipelineID string , source string ) (string , error ) {
122- // Find the pipeline with the specified ID
123- var targetConfig config.Pipeline
124- found := false
125- for _ , cfg := range configs {
126- if cfg .ID == pipelineID {
127- targetConfig = cfg
128- found = true
129- break
130- }
130+ // UpsertYaml parses a YAML input (either a file path or a YAML string) and creates or updates a pipeline.
131+ // If the pipeline doesn't exist, it will be created. If it does exist, it will be updated.
132+ // This can be used to reload a pipeline configuration without restarting Conduit.
133+ // If the YAML contains multiple pipeline definitions, only the first one will be used.
134+ func (s * Service ) UpsertYaml (ctx context.Context , yamlInput string ) (string , error ) {
135+ // Parse the YAML input
136+ pipelineConfig , source , err := s .parseYamlInput (ctx , yamlInput )
137+ if err != nil {
138+ return "" , err
131139 }
132140
133- if ! found {
134- return "" , cerrors .Errorf ("pipeline with ID %q not found in source %q" , pipelineID , source )
135- }
141+ return s .reloadPipeline (ctx , pipelineConfig , source )
142+ }
143+
144+ // reloadPipeline is a helper function that handles the common logic for reloading a pipeline
145+ // from either a file or a YAML string. It will create a new pipeline if it doesn't exist,
146+ // or update an existing one if it does.
147+ func (s * Service ) reloadPipeline (ctx context.Context , pipelineConfig config.Pipeline , source string ) (string , error ) {
148+ // Extract the pipeline ID from the config
149+ pipelineID := pipelineConfig .ID
136150
137151 // Check if pipeline already exists
138152 pipelineInstance , err := s .pipelineService .Get (ctx , pipelineID )
139153 if err != nil {
140154 if cerrors .Is (err , pipeline .ErrInstanceNotFound ) {
141155 // Pipeline doesn't exist, create it
142- s .logger .Info (ctx ).
143- Str ("pipeline_id" , pipelineID ).
144- Msg ("pipeline doesn't exist, creating new pipeline" )
156+ return s .upsertPipeline (ctx , nil , pipelineConfig , source )
145157 } else {
146158 return "" , cerrors .Errorf ("error getting pipeline instance with ID %q: %w" , pipelineID , err )
147159 }
148160 } else {
149- // Pipeline exists, check if it was provisioned by config
161+ // Pipeline exists, update it
162+ return s .upsertPipeline (ctx , pipelineInstance , pipelineConfig , source )
163+ }
164+ }
165+
166+ // upsertPipeline creates a new pipeline or updates an existing one with the given configuration.
167+ // If pipelineInstance is nil, a new pipeline will be created. Otherwise, the existing pipeline will be updated.
168+ func (s * Service ) upsertPipeline (ctx context.Context , pipelineInstance * pipeline.Instance , pipelineConfig config.Pipeline , source string ) (string , error ) {
169+ pipelineID := pipelineConfig .ID
170+
171+ // Check if we're creating a new pipeline or updating an existing one
172+ isCreate := pipelineInstance == nil
173+
174+ if isCreate {
175+ // Creating a new pipeline
176+ s .logger .Info (ctx ).
177+ Str ("pipeline_id" , pipelineID ).
178+ Msg ("pipeline doesn't exist, creating new pipeline" )
179+ } else {
180+ // Updating an existing pipeline
181+ // Check if the pipeline was provisioned by config
150182 if pipelineInstance .ProvisionedBy != pipeline .ProvisionTypeConfig {
151183 return "" , cerrors .Errorf ("pipeline with ID %q was not provisioned by config: %w" , pipelineID , ErrNotProvisionedByConfig )
152184 }
@@ -158,9 +190,15 @@ func (s *Service) reloadPipeline(ctx context.Context, configs []config.Pipeline,
158190 Str ("pipeline_id" , pipelineID ).
159191 Msg ("stopping pipeline before updating configuration" )
160192
161- err = s .lifecycleService .Stop (ctx , pipelineID , false )
193+ err : = s .lifecycleService .Stop (ctx , pipelineID , false )
162194 if err != nil {
163- return "" , cerrors .Errorf ("could not stop pipeline %q before updating: %w" , pipelineID , err )
195+ // Ignore the error if the pipeline is not running
196+ if ! strings .Contains (err .Error (), "pipeline not running" ) {
197+ return "" , cerrors .Errorf ("could not stop pipeline %q before updating: %w" , pipelineID , err )
198+ }
199+ s .logger .Debug (ctx ).
200+ Str ("pipeline_id" , pipelineID ).
201+ Msg ("pipeline was not running, continuing with update" )
164202 }
165203 }
166204
@@ -170,26 +208,34 @@ func (s *Service) reloadPipeline(ctx context.Context, configs []config.Pipeline,
170208 Msg ("deleting existing pipeline before recreating" )
171209
172210 // Use s.Delete to properly clean up the pipeline
173- err = s .Delete (ctx , pipelineID )
211+ err : = s .Delete (ctx , pipelineID )
174212 if err != nil {
175213 return "" , cerrors .Errorf ("could not delete existing pipeline %q: %w" , pipelineID , err )
176214 }
177215 }
178216
179- // Provision the pipeline with the new configuration
217+ // Provision the pipeline with the configuration
180218 s .logger .Debug (ctx ).
181219 Str ("pipeline_id" , pipelineID ).
182- Msg ("provisioning pipeline with updated configuration " )
220+ Msg ("provisioning pipeline" )
183221
184- err = s .provisionPipeline (ctx , targetConfig )
222+ err : = s .provisionPipeline (ctx , pipelineConfig )
185223 if err != nil {
186224 return "" , cerrors .Errorf ("pipeline %q, error while provisioning: %w" , pipelineID , err )
187225 }
188226
189- s .logger .Info (ctx ).
190- Str ("pipeline_id" , pipelineID ).
191- Str ("source" , source ).
192- Msg ("pipeline configuration reloaded successfully" )
227+ // Log success message based on whether we created or updated the pipeline
228+ if isCreate {
229+ s .logger .Info (ctx ).
230+ Str ("pipeline_id" , pipelineID ).
231+ Str ("source" , source ).
232+ Msg ("pipeline created successfully" )
233+ } else {
234+ s .logger .Info (ctx ).
235+ Str ("pipeline_id" , pipelineID ).
236+ Str ("source" , source ).
237+ Msg ("pipeline configuration updated successfully" )
238+ }
193239
194240 return pipelineID , nil
195241}
0 commit comments