Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/chunked_output_benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def main_loop
if output[:current] < input[:limit]
consumed = yield
output[:current] += consumed
plan_event(nil)
plan_event(nil, untracked: true)
suspend
end
end
Expand Down
4 changes: 2 additions & 2 deletions examples/orchestrate.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require_relative 'example_helper'

example_description = <<DESC
Orchestrate Example
===================
Expand All @@ -21,8 +23,6 @@

DESC

require_relative 'example_helper'

module Orchestrate

class CreateInfrastructure < Dynflow::Action
Expand Down
2 changes: 1 addition & 1 deletion examples/orchestrate_evented.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def run(event = nil)
# do nothing
end),
(on nil do
suspend { |suspended_action| world.clock.ping suspended_action, rand(1), Finished }
suspend { |suspended_action| world.clock.ping suspended_action, rand(1), Finished, untracked: true }
end))
end

Expand Down
7 changes: 4 additions & 3 deletions lib/dynflow/action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def to_s_humanized
step_id: Integer,
event: Object,
time: type { variants Time, NilClass },
optional: Algebrick::Types::Boolean
optional: Algebrick::Types::Boolean,
untracked: Algebrick::Types::Boolean
end

def self.constantize(action_name)
Expand Down Expand Up @@ -343,9 +344,9 @@ def queue

# Plan an +event+ to be send to the action defined by +action+, what defaults to be self.
# if +time+ is not passed, event is sent as soon as possible.
def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false)
def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false, untracked: false)
time = @world.clock.current_time + time if time.is_a?(Numeric)
delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional]
delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional, untracked]
end

def delayed_events
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/action/polling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def resume_external_action
end

def suspend_and_ping
plan_event(Poll, poll_interval)
plan_event(Poll, poll_interval, untracked: true)
suspend
end

Expand Down
4 changes: 2 additions & 2 deletions lib/dynflow/action/suspended.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def initialize(action)
@step_id = action.run_step_id
end

def plan_event(event, time, sent = Concurrent::Promises.resolvable_future, optional: false)
@world.plan_event(execution_plan_id, step_id, event, time, sent, optional: optional)
def plan_event(event, time, sent = Concurrent::Promises.resolvable_future, optional: false, untracked: false)
@world.plan_event(execution_plan_id, step_id, event, time, sent, optional: optional, untracked: untracked)
end

def event(event, sent = Concurrent::Promises.resolvable_future, optional: false)
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/action/timeouts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def process_timeout
end

def schedule_timeout(seconds, optional: false)
plan_event(Timeout, seconds, optional: optional)
plan_event(Timeout, seconds, optional: optional, untracked: true)
end
end
end
2 changes: 1 addition & 1 deletion lib/dynflow/action/with_bulk_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def run_progress
def spawn_plans
super
ensure
plan_event(PlanNextBatch)
plan_event(PlanNextBatch, untracked: true)
end

def cancel!(force = false)
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/action/with_polling_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,9 @@ def recalculate_counts
:failed_count => failed - output.fetch(:resumed_count, 0),
:success_count => success)
end

def can_fire_and_forget_sub_plans?
true
end
end
end
7 changes: 6 additions & 1 deletion lib/dynflow/action/with_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def trigger(action_class, *args)
if uses_concurrency_control
trigger_with_concurrency_control(action_class, *args)
else
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
method = can_fire_and_forget_sub_plans? ? :trigger_untracked : :trigger
world.public_send(:trigger) { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end
end

Expand Down Expand Up @@ -234,5 +235,9 @@ def check_for_errors!
def uses_concurrency_control
@uses_concurrency_control = input.key? :concurrency_control
end

def can_fire_and_forget_sub_plans?
false
end
end
end
4 changes: 2 additions & 2 deletions lib/dynflow/clock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ def current_time
Time.now
end

def ping(who, time, with_what = nil, where = :<<, optional: false)
def ping(who, time, with_what = nil, where = :<<, optional: false, untracked: false)
Type! time, Time, Numeric
time = current_time + time if time.is_a? Numeric
if who.is_a?(Action::Suspended)
who.plan_event(with_what, time, optional: optional)
who.plan_event(with_what, time, optional: optional, untracked: untracked)
else
timer = Clock::Timer[who, time, with_what.nil? ? Algebrick::Types::None : Some[Object][with_what], where]
self.tell([:add_timer, timer])
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/connectors/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def send_envelope(envelope)
end

def update_receiver_id(envelope, new_receiver_id)
Dispatcher::Envelope[envelope.request_id, envelope.sender_id, new_receiver_id, envelope.message]
Dispatcher::Envelope[envelope.request_id, envelope.sender_id, new_receiver_id, envelope.message, envelope.untracked]
end

def find_receiver(envelope)
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/director.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Director
include Algebrick::TypeCheck

Event = Algebrick.type do
fields! request_id: String,
fields! request_id: type { variants NilClass, String },
execution_plan_id: String,
step_id: Integer,
event: Object,
Expand Down
8 changes: 5 additions & 3 deletions lib/dynflow/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ module Dispatcher
end

Envelope = Algebrick.type do
fields! request_id: String,
fields! request_id: type { variants NilClass, String },
sender_id: String,
receiver_id: type { variants String, AnyExecutor = atom, UnknownWorld = atom },
message: type { variants Request, Response }
message: type { variants Request, Response },
untracked: type { variants TrueClass, FalseClass }
end

module Envelope
def build_response_envelope(response_message, sender)
Envelope[self.request_id,
sender.id,
self.sender_id,
response_message]
response_message,
false]
end
end

Expand Down
2 changes: 2 additions & 0 deletions lib/dynflow/dispatcher/abstract.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ def connector
end

def respond(request_envelope, response)
return if request_envelope.untracked

response_envelope = request_envelope.build_response_envelope(response, @world)
connector.send(response_envelope)
end
Expand Down
24 changes: 16 additions & 8 deletions lib/dynflow/dispatcher/client_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ module Dispatcher
class ClientDispatcher < Abstract

TrackedRequest = Algebrick.type do
fields! id: String, request: Request,
accepted: Concurrent::Promises::ResolvableFuture, finished: Concurrent::Promises::ResolvableFuture
fields! id: type { variants String, NilClass },
request: Request,
accepted: type { variants NilClass, Concurrent::Promises::ResolvableFuture },
finished: type { variants NilClass, Concurrent::Promises::ResolvableFuture }
end

module TrackedRequest
Expand Down Expand Up @@ -112,10 +114,10 @@ def initialize(world, ping_cache_age)
@ping_cache = PingCache.new world, ping_cache_age
end

def publish_request(future, request, timeout)
def publish_request(future, request, timeout, untracked)
with_ping_request_caching(request, future) do
track_request(future, request, timeout) do |tracked_request|
dispatch_request(request, @world.id, tracked_request.id)
track_request(future, request, timeout, untracked) do |tracked_request|
dispatch_request(request, @world.id, tracked_request.id, untracked)
end
end
end
Expand All @@ -131,7 +133,7 @@ def start_termination(*args)
finish_termination
end

def dispatch_request(request, client_world_id, request_id)
def dispatch_request(request, client_world_id, request_id, untracked)
ignore_unknown = false
executor_id = match request,
(on ~Execution | ~Planning do |execution|
Expand All @@ -144,7 +146,7 @@ def dispatch_request(request, client_world_id, request_id)
(on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _|
receiver_id
end)
envelope = Envelope[request_id, client_world_id, executor_id, request]
envelope = Envelope[request_id, client_world_id, executor_id, request, untracked]
if Dispatcher::UnknownWorld === envelope.receiver_id
raise Dynflow::Error, "Could not find an executor for #{envelope}" unless ignore_unknown

Expand Down Expand Up @@ -203,9 +205,15 @@ def find_executor(execution_plan_id)
Dispatcher::UnknownWorld
end

def track_request(finished, request, timeout)
def track_request(finished, request, timeout, untracked)
id_suffix = @last_id_suffix += 1
id = "#{@world.id}-#{id_suffix}"

if untracked
yield TrackedRequest[id, request, nil, finished]
return
end

tracked_request = TrackedRequest[id, request, Concurrent::Promises.resolvable_future, finished]
@tracked_requests[id] = tracked_request
@world.clock.ping(self, timeout, [:timeout, id]) if timeout
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/executors/abstract/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def handle_planning(execution_plan_id)

def plan_events(delayed_events)
delayed_events.each do |event|
@world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional)
@world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional, untracked: event.untracked)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/testing/in_thread_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def process_work_items

def plan_events(delayed_events)
delayed_events.each do |event|
@world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time)
@world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional, untracked: event.untracked)
end
end

Expand Down
29 changes: 23 additions & 6 deletions lib/dynflow/world.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,23 @@ def trigger(action_class = nil, *args, &block)
end
end

def trigger_untracked(action_class = nil, *args, &block)
if action_class.nil?
raise 'Neither action_class nor a block given' if block.nil?
execution_plan = block.call(self)
else
execution_plan = plan(action_class, *args)
end
planned = execution_plan.state == :planned

if planned
done = execute(execution_plan.id, Concurrent::Promises.resolvable_future, untracked: true)
Scheduled[execution_plan.id]
else
PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
end
end

def delay(action_class, delay_options, *args)
delay_with_options(action_class: action_class, args: args, delay_options: delay_options)
end
Expand Down Expand Up @@ -223,16 +240,16 @@ def plan_with_options(action_class:, args:, id: nil, caller_action: nil)

# @return [Concurrent::Promises::ResolvableFuture] containing execution_plan when finished
# raises when ExecutionPlan is not accepted for execution
def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future)
publish_request(Dispatcher::Execution[execution_plan_id], done, true)
def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future, untracked: false)
publish_request(Dispatcher::Execution[execution_plan_id], done, true, untracked: untracked)
end

def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false)
publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false)
end

def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false)
publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false)
def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false, untracked: false)
publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false, untracked: untracked)
end

def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future)
Expand All @@ -251,12 +268,12 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent
publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end

def publish_request(request, done, wait_for_accepted, timeout = nil)
def publish_request(request, done, wait_for_accepted, timeout = nil, untracked: false)
accepted = Concurrent::Promises.resolvable_future
accepted.rescue do |reason|
done.reject reason if reason
end
client_dispatcher.ask([:publish_request, done, request, timeout], accepted)
client_dispatcher.ask([:publish_request, done, request, timeout, untracked], accepted)
accepted.wait if wait_for_accepted
done
rescue => e
Expand Down
3 changes: 2 additions & 1 deletion lib/dynflow/world/invalidation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def invalidate_execution_lock(execution_lock)
client_dispatcher.tell([:dispatch_request,
Dispatcher::Execution[execution_lock.execution_plan_id],
execution_lock.client_world_id,
execution_lock.request_id])
execution_lock.request_id,
true])
end
end
end
Expand Down