diff --git a/examples/chunked_output_benchmark.rb b/examples/chunked_output_benchmark.rb index 3aa3a3659..237b1c91b 100644 --- a/examples/chunked_output_benchmark.rb +++ b/examples/chunked_output_benchmark.rb @@ -12,7 +12,7 @@ def main_loop if output[:current] < input[:limit] consumed = yield output[:current] += consumed - plan_event(nil) + plan_event(nil, untracked: true) suspend end end diff --git a/examples/orchestrate.rb b/examples/orchestrate.rb index fdb3a42c6..d37990211 100755 --- a/examples/orchestrate.rb +++ b/examples/orchestrate.rb @@ -1,6 +1,8 @@ #!/usr/bin/env ruby # frozen_string_literal: true +require_relative 'example_helper' + example_description = < failed - output.fetch(:resumed_count, 0), :success_count => success) end + + def can_fire_and_forget_sub_plans? + true + end end end diff --git a/lib/dynflow/action/with_sub_plans.rb b/lib/dynflow/action/with_sub_plans.rb index 3be37e69a..5e3d31451 100644 --- a/lib/dynflow/action/with_sub_plans.rb +++ b/lib/dynflow/action/with_sub_plans.rb @@ -81,7 +81,8 @@ def trigger(action_class, *args) if uses_concurrency_control trigger_with_concurrency_control(action_class, *args) else - world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) } + method = can_fire_and_forget_sub_plans? ? :trigger_untracked : :trigger + world.public_send(:trigger) { world.plan_with_options(action_class: action_class, args: args, caller_action: self) } end end @@ -234,5 +235,9 @@ def check_for_errors! def uses_concurrency_control @uses_concurrency_control = input.key? :concurrency_control end + + def can_fire_and_forget_sub_plans? + false + end end end diff --git a/lib/dynflow/clock.rb b/lib/dynflow/clock.rb index 29ff6bd9a..3362c6450 100644 --- a/lib/dynflow/clock.rb +++ b/lib/dynflow/clock.rb @@ -114,11 +114,11 @@ def current_time Time.now end - def ping(who, time, with_what = nil, where = :<<, optional: false) + def ping(who, time, with_what = nil, where = :<<, optional: false, untracked: false) Type! time, Time, Numeric time = current_time + time if time.is_a? Numeric if who.is_a?(Action::Suspended) - who.plan_event(with_what, time, optional: optional) + who.plan_event(with_what, time, optional: optional, untracked: untracked) else timer = Clock::Timer[who, time, with_what.nil? ? Algebrick::Types::None : Some[Object][with_what], where] self.tell([:add_timer, timer]) diff --git a/lib/dynflow/connectors/database.rb b/lib/dynflow/connectors/database.rb index 0c231abbe..e4ddecfe2 100644 --- a/lib/dynflow/connectors/database.rb +++ b/lib/dynflow/connectors/database.rb @@ -127,7 +127,7 @@ def send_envelope(envelope) end def update_receiver_id(envelope, new_receiver_id) - Dispatcher::Envelope[envelope.request_id, envelope.sender_id, new_receiver_id, envelope.message] + Dispatcher::Envelope[envelope.request_id, envelope.sender_id, new_receiver_id, envelope.message, envelope.untracked] end def find_receiver(envelope) diff --git a/lib/dynflow/director.rb b/lib/dynflow/director.rb index 3fc735985..82676c3d4 100644 --- a/lib/dynflow/director.rb +++ b/lib/dynflow/director.rb @@ -11,7 +11,7 @@ class Director include Algebrick::TypeCheck Event = Algebrick.type do - fields! request_id: String, + fields! request_id: type { variants NilClass, String }, execution_plan_id: String, step_id: Integer, event: Object, diff --git a/lib/dynflow/dispatcher.rb b/lib/dynflow/dispatcher.rb index dfea6e504..aef1771c4 100644 --- a/lib/dynflow/dispatcher.rb +++ b/lib/dynflow/dispatcher.rb @@ -40,10 +40,11 @@ module Dispatcher end Envelope = Algebrick.type do - fields! request_id: String, + fields! request_id: type { variants NilClass, String }, sender_id: String, receiver_id: type { variants String, AnyExecutor = atom, UnknownWorld = atom }, - message: type { variants Request, Response } + message: type { variants Request, Response }, + untracked: type { variants TrueClass, FalseClass } end module Envelope @@ -51,7 +52,8 @@ def build_response_envelope(response_message, sender) Envelope[self.request_id, sender.id, self.sender_id, - response_message] + response_message, + false] end end diff --git a/lib/dynflow/dispatcher/abstract.rb b/lib/dynflow/dispatcher/abstract.rb index 0decdea45..5e89e8f85 100644 --- a/lib/dynflow/dispatcher/abstract.rb +++ b/lib/dynflow/dispatcher/abstract.rb @@ -7,6 +7,8 @@ def connector end def respond(request_envelope, response) + return if request_envelope.untracked + response_envelope = request_envelope.build_response_envelope(response, @world) connector.send(response_envelope) end diff --git a/lib/dynflow/dispatcher/client_dispatcher.rb b/lib/dynflow/dispatcher/client_dispatcher.rb index fc436660f..8f1601728 100644 --- a/lib/dynflow/dispatcher/client_dispatcher.rb +++ b/lib/dynflow/dispatcher/client_dispatcher.rb @@ -4,8 +4,10 @@ module Dispatcher class ClientDispatcher < Abstract TrackedRequest = Algebrick.type do - fields! id: String, request: Request, - accepted: Concurrent::Promises::ResolvableFuture, finished: Concurrent::Promises::ResolvableFuture + fields! id: type { variants String, NilClass }, + request: Request, + accepted: type { variants NilClass, Concurrent::Promises::ResolvableFuture }, + finished: type { variants NilClass, Concurrent::Promises::ResolvableFuture } end module TrackedRequest @@ -112,10 +114,10 @@ def initialize(world, ping_cache_age) @ping_cache = PingCache.new world, ping_cache_age end - def publish_request(future, request, timeout) + def publish_request(future, request, timeout, untracked) with_ping_request_caching(request, future) do - track_request(future, request, timeout) do |tracked_request| - dispatch_request(request, @world.id, tracked_request.id) + track_request(future, request, timeout, untracked) do |tracked_request| + dispatch_request(request, @world.id, tracked_request.id, untracked) end end end @@ -131,7 +133,7 @@ def start_termination(*args) finish_termination end - def dispatch_request(request, client_world_id, request_id) + def dispatch_request(request, client_world_id, request_id, untracked) ignore_unknown = false executor_id = match request, (on ~Execution | ~Planning do |execution| @@ -144,7 +146,7 @@ def dispatch_request(request, client_world_id, request_id) (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _| receiver_id end) - envelope = Envelope[request_id, client_world_id, executor_id, request] + envelope = Envelope[request_id, client_world_id, executor_id, request, untracked] if Dispatcher::UnknownWorld === envelope.receiver_id raise Dynflow::Error, "Could not find an executor for #{envelope}" unless ignore_unknown @@ -203,9 +205,15 @@ def find_executor(execution_plan_id) Dispatcher::UnknownWorld end - def track_request(finished, request, timeout) + def track_request(finished, request, timeout, untracked) id_suffix = @last_id_suffix += 1 id = "#{@world.id}-#{id_suffix}" + + if untracked + yield TrackedRequest[id, request, nil, finished] + return + end + tracked_request = TrackedRequest[id, request, Concurrent::Promises.resolvable_future, finished] @tracked_requests[id] = tracked_request @world.clock.ping(self, timeout, [:timeout, id]) if timeout diff --git a/lib/dynflow/executors/abstract/core.rb b/lib/dynflow/executors/abstract/core.rb index cbac0568d..e56416c4c 100644 --- a/lib/dynflow/executors/abstract/core.rb +++ b/lib/dynflow/executors/abstract/core.rb @@ -46,7 +46,7 @@ def handle_planning(execution_plan_id) def plan_events(delayed_events) delayed_events.each do |event| - @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional) + @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional, untracked: event.untracked) end end diff --git a/lib/dynflow/testing/in_thread_executor.rb b/lib/dynflow/testing/in_thread_executor.rb index e73a7c4c1..bcdfca9a5 100644 --- a/lib/dynflow/testing/in_thread_executor.rb +++ b/lib/dynflow/testing/in_thread_executor.rb @@ -23,7 +23,7 @@ def process_work_items def plan_events(delayed_events) delayed_events.each do |event| - @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time) + @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional, untracked: event.untracked) end end diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index d18458999..9400dbc75 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -189,6 +189,23 @@ def trigger(action_class = nil, *args, &block) end end + def trigger_untracked(action_class = nil, *args, &block) + if action_class.nil? + raise 'Neither action_class nor a block given' if block.nil? + execution_plan = block.call(self) + else + execution_plan = plan(action_class, *args) + end + planned = execution_plan.state == :planned + + if planned + done = execute(execution_plan.id, Concurrent::Promises.resolvable_future, untracked: true) + Scheduled[execution_plan.id] + else + PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] + end + end + def delay(action_class, delay_options, *args) delay_with_options(action_class: action_class, args: args, delay_options: delay_options) end @@ -223,16 +240,16 @@ def plan_with_options(action_class:, args:, id: nil, caller_action: nil) # @return [Concurrent::Promises::ResolvableFuture] containing execution_plan when finished # raises when ExecutionPlan is not accepted for execution - def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) - publish_request(Dispatcher::Execution[execution_plan_id], done, true) + def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future, untracked: false) + publish_request(Dispatcher::Execution[execution_plan_id], done, true, untracked: untracked) end def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false) end - def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) - publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false) + def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false, untracked: false) + publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false, untracked: untracked) end def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) @@ -251,12 +268,12 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end - def publish_request(request, done, wait_for_accepted, timeout = nil) + def publish_request(request, done, wait_for_accepted, timeout = nil, untracked: false) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason| done.reject reason if reason end - client_dispatcher.ask([:publish_request, done, request, timeout], accepted) + client_dispatcher.ask([:publish_request, done, request, timeout, untracked], accepted) accepted.wait if wait_for_accepted done rescue => e diff --git a/lib/dynflow/world/invalidation.rb b/lib/dynflow/world/invalidation.rb index 7e1234b9a..898e6eb83 100644 --- a/lib/dynflow/world/invalidation.rb +++ b/lib/dynflow/world/invalidation.rb @@ -71,7 +71,8 @@ def invalidate_execution_lock(execution_lock) client_dispatcher.tell([:dispatch_request, Dispatcher::Execution[execution_lock.execution_plan_id], execution_lock.client_world_id, - execution_lock.request_id]) + execution_lock.request_id, + true]) end end end