Skip to content

Conversation

@adamruzicka
Copy link
Contributor

@adamruzicka adamruzicka commented May 17, 2024

This commit enables execution plans to be chained. Assuming there is an execution plan EP1, another execution plan EP2 can be chained onto EP1. When chained, EP2 will stay in scheduled state until EP1 goes to stopped state. An execution plan can be chained onto multiple prerequisite execution plans, in which case it will be run once all the prerequisite execution plans are stopped.

It builds on mechanisms which were already present. When an execution plan is chained, it behaves in the same way as if it was scheduled for future execution. A record is created in dynflow_delayed_table and once the conditions for it to execute are right, it is dispatched by the delayed executor. Because of this, there might be small delay between when the prerequisites finishs and the chained plan is started.

TODOs:

  • somehow show the prerequisite execution plans in the console

@ianballou
Copy link

This will be helpful to solve the issue in Katello where a composite content view auto publish can be triggered for content views that are still publishing. They are separate execution chains that need to rely on each other.

@ianballou
Copy link

I'm finally starting to look into the Katello issue more seriously, so I'm going to test this out.

end

def find_past_delayed_plans(time)
def find_ready_delayed_plans(time)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had an issue where one composite CV publish was waiting on two children component CV publishes. However, the task would still start after the quicker child finished.

I don't know Dynflow super well, so I employed some AI tool help:


/begin robot

Issue: The find_ready_delayed_plans query in lib/dynflow/persistence_adapters/sequel.rb had a bug when handling execution plans with multiple dependencies.

It would return a delayed plan as "ready" if ANY dependency was stopped, instead of waiting for ALL dependencies to stop.

Root Cause: The original query used LEFT JOINs:

LEFT JOIN dependencies ON delayed.uuid = dependencies.execution_plan_uuid
LEFT JOIN execution_plans ON dependencies.blocked_by_uuid = execution_plans.uuid
WHERE (state IS NULL OR state = 'stopped')

With multiple dependencies (e.g., plan D depends on A and B):

If A is 'running' and B is 'stopped', the LEFT JOIN produces 2 rows
The WHERE clause filters out the row with A ('running')
But keeps the row with B ('stopped')
Result: D is returned as "ready" even though A is still running

Fix: Changed to NOT EXISTS subquery to ensure NO dependencies are in a non-stopped state:

WHERE NOT EXISTS (
  SELECT 1 FROM dependencies
  LEFT JOIN execution_plans ON dependencies.blocked_by_uuid = execution_plans.uuid
  WHERE dependencies.execution_plan_uuid = delayed.execution_plan_uuid
  AND execution_plans.state IS NOT NULL
  AND execution_plans.state != 'stopped'
)

Result: Chained execution plans now correctly wait for ALL dependencies to complete before running, as documented in the original PR description.

/end robot


I tested this out, and afterwards the publish did indeed wait properly for the slower child to finish.

It's possible I'm using this chaining method incorrectly in my development branch, but let me know what you think of the above.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So:

diff --git a/lib/dynflow/persistence_adapters/sequel.rb b/lib/dynflow/persistence_adapters/sequel.rb
index b36298b..502aadf 100644
--- a/lib/dynflow/persistence_adapters/sequel.rb
+++ b/lib/dynflow/persistence_adapters/sequel.rb
@@ -146,14 +146,22 @@ module Dynflow
 
       def find_ready_delayed_plans(time)
         table_name = :delayed
+        # Find delayed plans where ALL dependencies (if any) are either non-existent or stopped
+        # We use NOT EXISTS to ensure no dependency is in a non-stopped state
         table(table_name)
-          .left_join(TABLES[:execution_plan_dependency], execution_plan_uuid: :execution_plan_uuid)
-          .left_join(TABLES[:execution_plan], uuid: :blocked_by_uuid)
           .where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time))
-          .where(::Sequel[{ state: nil }] | ::Sequel[{ state: 'stopped' }])
           .where(:frozen => false)
+          .where(::Sequel.lit(
+            "NOT EXISTS (
+              SELECT 1
+              FROM #{TABLES[:execution_plan_dependency]} dep
+              LEFT JOIN #{TABLES[:execution_plan]} ep ON dep.blocked_by_uuid = ep.uuid
+              WHERE dep.execution_plan_uuid = #{TABLES[table_name]}.execution_plan_uuid
+              AND ep.state IS NOT NULL
+              AND ep.state != 'stopped'
+            )"
+          ))
           .order_by(:start_at)
-          .select_all(TABLES[table_name])
           .all
           .map { |plan| load_data(plan, table_name) }
       end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, this was put together rather quickly, I'll have to take a look at this again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the suggestion looks reasonable, although I'll try to reduce raw sql as much as possible

Copy link

@ianballou ianballou Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, this was put together rather quickly, I'll have to take a look at this again

It seems to work well for being a prototype!

@ianballou
Copy link

I'm planning on submitting a Foreman Tasks PR for integration there. In Katello I currently am including some ForemanTasks addons for the proof of concept.

@ianballou
Copy link

Is there a good way to get the input data from the scheduled chained task? So far it looks like the inputs aren't available yet so they need to be grabbed from ForemanTasks.dynflow.world.persistence.load_delayed_plan(task.external_id).args.

@adamruzicka
Copy link
Contributor Author

Is there a good way to get the input data

How would you define input data? What you get from ForemanTasks.dynflow.world.persistence.load_delayed_plan(task.external_id).args are the arguments that will end up being passed to #plan. The root action's #input may end up being different after the whole planning goes through.

What would be the use case?

@ianballou
Copy link

ianballou commented Oct 24, 2025

Is there a good way to get the input data from the scheduled chained task? So far it looks like the inputs aren't available yet so they need to be grabbed from ForemanTasks.dynflow.world.persistence.load_delayed_plan(task.external_id).args.

Is there a good way to get the input data

How would you define input data? What you get from ForemanTasks.dynflow.world.persistence.load_delayed_plan(task.external_id).args are the arguments that will end up being passed to #plan. The root action's #input may end up being different after the whole planning goes through.

What would be the use case?

Okay, bear with me here.

When publishing component 2+ CVs that are part of a composite with auto publish enabled, 2+ composite CV publishes will be triggered. This is bad because the latter publish will error out on the Lock being taken. Plus, since the parent composite publish will wait on its publishing component children, only 1 composite CV publish is needed.

So, my implementation in Katello needs to look up the chained & scheduled composite CV publish task in order to skip auto publishing if there is already a task waiting. And thus I need to access the arguments that were passed to plan (which I was calling the input, but I suppose that is different from the actual task input). From the arguments I can find the content view ID to ensure that it is the composite waiting to be published.

@ianballou
Copy link

ianballou commented Oct 24, 2025

So, my implementation in Katello needs to look up the chained & scheduled composite CV publish task in order to skip auto publishing if there is already a task waiting.

I am rethinking this. If a child publishes while the parent is publishing, we would want a new CCV version to be published. That would mean chaining the new composite content view publish on the scheduled/running one. However, when I tried this, there were Lock errors. Could it work such that the lock checking is deferred to when the scheduled task actually runs?

Katello has a workaround for this via the Event queue, and it polls on lock failures. When it was implemented 6 years ago though chaining in Dynflow was wished for :) Katello/katello#8188

For now I'm going to just poll on the locking error for this one particular case, it is a bit of a corner case I think.

@ianballou
Copy link

Here is the related Katello PR: Katello/katello#11540

@ianballou
Copy link

From a meeting about this and Katello/katello#11540, the following was determined to be added to this PR:

  1. Some very basic UI details about the chain information
  2. Error handling so that a chained task is removed from the schedule if a dependency fails (optional, but on by default)
  3. The OR relationship between different chained dependencies turning into AND, which is related to Add execution plan chaining #446 (comment)

@adamruzicka , feel free to correct me if the expected outcome was different.

@ianballou
Copy link

I have a quick AI prototype of how to show the info in the Dynflow console:

image
diff --git a/lib/dynflow/execution_plan.rb b/lib/dynflow/execution_plan.rb
index e2dd251..b948eba 100644
--- a/lib/dynflow/execution_plan.rb
+++ b/lib/dynflow/execution_plan.rb
@@ -432,7 +432,7 @@ module Dynflow
     end
 
     def to_hash
-      recursive_to_hash id:                id,
+      hash = recursive_to_hash id:                id,
                         class:             self.class.to_s,
                         label:             label,
                         state:             state,
@@ -446,6 +446,22 @@ module Dynflow
                         execution_time:    execution_time,
                         real_time:         real_time,
                         execution_history: execution_history.to_hash
+
+      # Add dependency information for chained execution plans
+      if state == :scheduled
+        deps = load_dependencies
+        hash[:dependencies] = deps if deps.any?
+      end
+
+      hash
+    end
+
+    def load_dependencies
+      persistence.adapter.db[:dynflow_execution_plan_dependencies]
+        .where(execution_plan_uuid: id)
+        .select_map(:blocked_by_uuid)
+    rescue
+      []
     end
 
     def save
diff --git a/web/views/show.erb b/web/views/show.erb
index 7e895f6..efbf130 100644
--- a/web/views/show.erb
+++ b/web/views/show.erb
@@ -31,6 +31,18 @@
     <b>Start before:</b>
     <%= h(@plan.delay_record.start_before || "-") %>
   </p>
+  <% dependencies = @plan.load_dependencies %>
+  <% if dependencies.any? %>
+    <p>
+      <b>Waiting for:</b>
+      <%= dependencies.size %> task<%= dependencies.size > 1 ? 's' : '' %> to complete
+    </p>
+    <p style="margin-left: 20px;">
+      <% dependencies.each do |dep_id| %>
+        <a href="<%= url("/#{dep_id}") %>"><%= h(dep_id) %></a><br/>
+      <% end %>
+    </p>
+  <% end %>
 <% end %>

@adamruzicka
Copy link
Contributor Author

@ianballou I borrowed bits and pieces from what you suggested here, could you please take it for a spin?

@ianballou
Copy link

@adamruzicka I've tested this a bunch now with Katello/katello#11540.

It seems to be working really well. The scheduled task waits on its children and cancels itself if one of them fails.

Do you need me to perform a code review as well? Or can we have someone with an outside opinion come do that?

Copy link

@ofedoren ofedoren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few cents, can be ignored:

end
end

::Dynflow::Persistence.prepend ::Dynflow::Debug::Persistence

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but shouldn't it use ::Dynflow::Debug::Telemetry::Persistence instead? And if so, I think the whole file is not being loaded then since there was no error to imply that...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably? But let's address that elsewhere, tracked as #463

end
foreign_key :execution_plan_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties
foreign_key :blocked_by_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties
index :blocked_by_uuid

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth to also index :execution_plan_uuid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

module Debug
module Telemetry
module Persistence
methods = [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this list contains quite a few duplicates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #446 (comment)

@adamruzicka
Copy link
Contributor Author

and cancels itself if one of them fails.

That wasn't there before, I just added that.

Do you need me to perform a code review as well?

I won't say no to that offer

@adamruzicka adamruzicka force-pushed the chaining branch 3 times, most recently from 14a748f to c5b835c Compare December 1, 2025 08:41
@ianballou
Copy link

and cancels itself if one of them fails.

That wasn't there before, I just added that.

Strange, I wonder what canceled them then. Anyway, I'll give this a re-test.

<%= h(@plan.ended_at) %>
</p>

<% if @plan.state == :scheduled && @plan.delay_record %>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking nice:
image

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if it's presentable on the UI but the chaining being displayed only while chained task is scehduled and waiting seems like losing information after the job runs on the dynflow UI..Might be good to persist the UI chain info for debugging reasons?

Copy link

@ianballou ianballou Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like, for example, change from "Waiting for execution plans:" to "Waited for execution plans:" once the task actually runs? And start showing the old chained plans, of course.

Copy link
Contributor Author

@adamruzicka adamruzicka Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed, in the end I went with static "depends on"

if plan.start_before.nil?
blocker_ids = world.persistence.find_execution_plan_dependencies(execution_plan_id)
statuses = world.persistence.find_execution_plan_statuses({ filters: { uuid: blocker_ids } })
failed = statuses.select { |_uuid, status| status[:state] == 'stopped' && status[:result] == 'error' }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test this I tried the following:

  1. Create a chained composite CV task by publishing two child content views
  2. Force cancel one of the dependencies, which left the task in a funny state "stopped - pending"
  3. Force canceling didn't seem to trigger the chained task to run, so I did the following on the Force cancelled task:
ForemanTasks::Task.where(id: '63fcef24-bc37-4445-9221-44382f216442').update(result: 'error')

After that, I noticed the chained task actually started running - I though it would halt itself with an error.

Is my test here flawed somehow?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually - @sjha4 in your testing, this might be good to try to reproduce. Maybe I just had a timing issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird. Updating the ForemanTasks::Task object should have no impact on anything as that's completely external to dynflow.


def failed_dependencies(uuids)
bullets = uuids.map { |u| "- #{u}" }.join("\n")
msg = "Execution plan could not be started because some of its preqrequisite execution plans failed:\n#{bullets}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in prerequisite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link

@ianballou ianballou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some less important comments, generally this is looking fine to me and has been working well in testing.

# Subquery to find delayed plans that have at least one non-stopped dependency
plans_with_unfinished_deps = table(:execution_plan_dependency)
.join(TABLES[:execution_plan], uuid: :blocked_by_uuid)
.where(::Sequel.~(state: 'stopped'))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check - when you force unlock a task, does it go to the 'stopped' state? If it doesn't, we might need a workflow for unlinking the scheduled task from the one that was force unlocked.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, my comment above says it goes to the stopped state. In which case, since it doesn't go to stopped - error, I believe the parent chained task should start running. I'm not sure how feasible it would be to cause force unlock to unschedule chained parents.
I'd be okay with force unlock continuing to run the parent tasks since it's pretty much a debug action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, my comment above says it goes to the stopped state.

It does

I'm not sure how feasible it would be to cause force unlock to unschedule chained parents.

It would probably be on the more difficult end of the spectrum, so I'd prefer to not go down that path.

if plan.start_before.nil?
blocker_ids = world.persistence.find_execution_plan_dependencies(execution_plan_id)
statuses = world.persistence.find_execution_plan_statuses({ filters: { uuid: blocker_ids } })
failed = statuses.select { |_uuid, status| status[:state] == 'stopped' && status[:result] == 'error' }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually - @sjha4 in your testing, this might be good to try to reproduce. Maybe I just had a timing issue.

_(plan2.errors.first.message).must_match(/#{plan1.id}/)
end

it 'cancels the chained plan if at least one prerequisite fails' do

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a test as well for force unlocking a prerequisite task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

<% dependencies = @plan.world.persistence.find_execution_plan_dependencies(@plan.id) %>
<% if dependencies.any? %>
<p>
<b>Depends on execution plans:</b>
Copy link

@ianballou ianballou Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sjha4 it looks like what you asked for, a permanent dependency history, is here now :)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome..It was hard to test the dependencies on the UI without this.. :)

@ianballou
Copy link

The only thing I haven't been able to test yet is multiple dependencies showing up. I'd like to get that going first, and then we should be good to merge here. Testing has been solid so far.

save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false
end

def chain_execution_plan(first, second)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This patch:

diff --git a/lib/dynflow/persistence_adapters/sequel.rb b/lib/dynflow/persistence_adapters/sequel.rb
index c673090..0de3ea7 100644
--- a/lib/dynflow/persistence_adapters/sequel.rb
+++ b/lib/dynflow/persistence_adapters/sequel.rb
@@ -196,7 +196,11 @@ module Dynflow
       end
 
       def chain_execution_plan(first, second)
-        save :execution_plan_dependency, { execution_plan_uuid: second }, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false
+        # Insert dependency directly without checking for existing records.
+        # The table is designed to allow multiple dependencies per execution plan.
+        # Using save() causes upsert behavior that overwrites existing dependencies.
+        record = { execution_plan_uuid: second, blocked_by_uuid: first }
+        with_retry { table(:execution_plan_dependency).insert(record) }
       end
 
       def load_step(execution_plan_id, step_id)

Caused multiple dependencies to start showing up for me. I'm unsure if it's save to do the inserts here like this, but it worked around the upserting causing trouble.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a different approach, but it should be fixed.

Scheduled[execution_plan.id]
end

def chain(plan_uuids, action_class, *args)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm seeing the chaining only keeping one of the chained tasks instead of them all.

I tested publishing 3 content views. The first takes a really long time. After publishing them in order of speed, I sometimes see that only one of the faster content views was made as a dependency of the composite content view publish.

I thought it could be a Katello PR issue, but after adding debug logging, I found I was passing in two children, yet it the slower child was not waited on (and the new Dynflow chaining UI showed this).

Claude dug around in the code a bit, and looked at:

https://github.com/Dynflow/dynflow/blob/3dea62325aacc96f8df8117f88657970c61f2836/lib/dynflow/persistence_adapters/sequel.rb#L367C1-L386C10

      def save(what, condition, value, with_data: true, update_conditions: {})
        table           = table(what)
        existing_record = with_retry { table.first condition } unless condition.empty?

        if value
          record = prepare_record(what, value, (existing_record || condition), with_data)
          if existing_record
            record = prune_unchanged(what, existing_record, record)
            return value if record.empty?
            condition = update_conditions.merge(condition)
            return with_retry { table.where(condition).update(record) }  <--------
          else
            with_retry { table.insert record }
          end

        else
          existing_record and with_retry { table.where(condition).delete }
        end
        value
      end

It's suggesting that the upsert logic in here is causing the other chained methods to be overwritten. I tried getting around the upsert logic and only then did I see multiple dependencies in the Dynflow UI for the composite task. See my other comment for the patch.

Copy link

@ianballou ianballou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is looking good now! I verified that multiple dependencies can exist at the same time.

This commit enables execution plans to be chained. Assuming there is an
execution plan EP1, another execution plan EP2 can be chained onto EP1.  When
chained, EP2 will stay in scheduled state until EP1 goes to stopped state. An
execution plan can be chained onto multiple prerequisite execution plans, in
which case it will be run once all the prerequisite execution plans are stopped
and have not failed. If the prerequisite execution plan ends with
stopped-error, the chained execution plan(s) will fail. If the prerequisite
execution plan is halted, the chained execution plan(s) will be run.

It builds on mechanisms which were already present. When an execution
plan is chained, it behaves in the same way as if it was scheduled for
future execution. A record is created in dynflow_delayed_table and once
the conditions for it to execute are right, it is dispatched by the
delayed executor. Because of this, there might be small delay between
when the prerequisites finishs and the chained plan is started.
@adamruzicka adamruzicka merged commit 5538dd4 into Dynflow:master Dec 18, 2025
12 checks passed
@adamruzicka
Copy link
Contributor Author

Thank you @ianballou & @sjha4 !

@adamruzicka adamruzicka deleted the chaining branch December 18, 2025 11:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants