Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e83e465
feat(schema-polling): implement initial schema fetch and improve poll…
slimee Dec 11, 2025
109cd32
feat(schema-polling): enhance schema polling with initial synchronous…
slimee Dec 11, 2025
faa6fb3
fix(schema polling): improve initial polling behavior and logging
slimee Dec 11, 2025
967d9bd
fix(rpc): fix RuboCop offenses in schema_polling_client
slimee Dec 11, 2025
0af3bc6
chore: fix typo
slimee Dec 12, 2025
74e8835
chore: merge main into perf/improve-schema-polling
slimee Dec 12, 2025
132044b
fix(rpc): remove redundant assignment in build method
slimee Dec 12, 2025
6215deb
fix(schema-polling): move digest require to top of file
slimee Dec 12, 2025
503b38a
chore: trigger CI rerun
slimee Dec 12, 2025
408e75e
fix(schema): update polling interval option naming and improve ETag h…
slimee Dec 12, 2025
e31919d
fix(schema-polling): improve initial sync handling and error reportin…
slimee Dec 12, 2025
b65252d
refactor(schema polling): update log levels for schema sync completio…
slimee Dec 12, 2025
d54d4d2
fix(schema_polling): remove unnecessary whitespace in initial_sync_co…
slimee Dec 12, 2025
36e2b48
fix(schema polling): trigger schema update callback only after initia…
slimee Dec 12, 2025
63c3956
feat(datasource): add graceful shutdown hook for schema polling cleanup
slimee Dec 12, 2025
ac06b19
refactor(schema_polling): improve thread stopping mechanism to avoid …
slimee Dec 12, 2025
9115481
fix(schema_polling): ensure graceful shutdown of polling thread
slimee Dec 12, 2025
ab74679
fix: call only one time the get schema at start
arnaud-moncel Dec 15, 2025
4053f38
fix: call with introspection
arnaud-moncel Dec 15, 2025
f62484a
chore: improve polling
arnaud-moncel Dec 15, 2025
af3e857
fix: polling start
arnaud-moncel Dec 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ class Error < StandardError; end
# @param options [Hash] Configuration options
# @option options [String] :uri The URI of the RPC agent
# @option options [String] :auth_secret The authentication secret (optional, will use cache if not provided)
# @option options [Integer] :schema_polling_interval Polling interval in seconds (optional)
# @option options [Integer] :schema_polling_interval_sec Polling interval in seconds (optional)
# - Default: 600 seconds (10 minutes)
# - Can be overridden with ENV['SCHEMA_POLLING_INTERVAL']
# - Can be overridden with ENV['SCHEMA_POLLING_INTERVAL_SEC']
# - Valid range: 1-3600 seconds
# - Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default
# - Example: SCHEMA_POLLING_INTERVAL=30 for development (30 seconds)
# - Priority: options[:schema_polling_interval_sec] > ENV['SCHEMA_POLLING_INTERVAL_SEC'] > default
# - Example: SCHEMA_POLLING_INTERVAL_SEC=30 for development (30 seconds)
# @option options [Hash] :introspection Pre-defined schema introspection for resilient deployment
# - When provided, allows the datasource to start even if the RPC slave is unreachable
# - The introspection will be used as fallback when the slave connection fails
Expand All @@ -29,36 +29,35 @@ def self.build(options)
uri = options[:uri]
auth_secret = options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret)
provided_introspection = options[:introspection]
ForestAdminAgent::Facades::Container.logger.log('Info', "Getting schema from RPC agent on #{uri}.")

schema = nil
# Create schema polling client with configurable polling interval
# Priority: options[:schema_polling_interval_sec] > ENV['SCHEMA_POLLING_INTERVAL_SEC'] > default (600)
polling_interval = if options[:schema_polling_interval_sec]
options[:schema_polling_interval_sec]
elsif ENV['SCHEMA_POLLING_INTERVAL_SEC']
ENV['SCHEMA_POLLING_INTERVAL_SEC'].to_i
else
600 # 10 minutes by default
end

begin
rpc_client = Utils::RpcClient.new(uri, auth_secret)
response = rpc_client.fetch_schema('/forest/rpc-schema')
schema = response.body
rescue Faraday::ConnectionFailed => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Connection failed to RPC agent at #{uri}: #{e.message}\n#{e.backtrace.join("\n")}"
)
rescue Faraday::TimeoutError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Request timeout to RPC agent at #{uri}: #{e.message}"
)
rescue ForestAdminAgent::Http::Exceptions::AuthenticationOpenIdClient => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Authentication failed with RPC agent at #{uri}: #{e.message}"
)
rescue StandardError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Failed to get schema from RPC agent at #{uri}: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
)
polling_options = {
polling_interval: polling_interval
}

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options, provided_introspection) do
# Callback when schema change is detected
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end

# Start polling (includes initial synchronous schema fetch)
# The initial fetch is blocking, then async polling starts
schema_polling.start

# Get the schema from the polling client
schema = schema_polling.current_schema

# Use provided introspection as fallback when slave is unreachable
if schema.nil? && provided_introspection
ForestAdminAgent::Facades::Container.logger.log(
Expand All @@ -70,69 +69,16 @@ def self.build(options)
end

if schema.nil?
# return empty datasource for not breaking stack
ForestAdminDatasourceToolkit::Datasource.new
# FAIL FAST: If no schema is available and no introspection was provided,
# the application cannot function properly. Better to crash early than
# start with an empty datasource.
raise ForestAdminDatasourceToolkit::Exceptions::ForestException.new(
"Fatal: Unable to build RPC datasource for #{uri}. " \
"The RPC agent is unreachable and no introspection schema was provided. " \
"Please ensure the RPC agent is running or provide an introspection schema for resilient deployment."
)
else
# Create schema polling client with configurable polling interval
# Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default (600)
polling_interval = if options[:schema_polling_interval]
options[:schema_polling_interval]
elsif ENV['SCHEMA_POLLING_INTERVAL']
ENV['SCHEMA_POLLING_INTERVAL'].to_i
else
600 # 10 minutes by default
end

polling_options = {
polling_interval: polling_interval
}

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options) do
# Callback when schema change is detected
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end
schema_polling.start

datasource = ForestAdminDatasourceRpc::Datasource.new(options, schema, schema_polling)

# Setup cleanup hooks for proper schema polling client shutdown
setup_cleanup_hooks(datasource)

datasource
end
end

def self.setup_cleanup_hooks(datasource)
# Register cleanup handler for graceful shutdown
at_exit do
datasource.cleanup
rescue StandardError => e
# Silently ignore errors during exit cleanup to prevent test pollution
warn "[RPCDatasource] Error during at_exit cleanup: #{e.message}" if $VERBOSE
end

# Handle SIGINT (Ctrl+C)
Signal.trap('INT') do
begin
ForestAdminAgent::Facades::Container.logger&.log('Info', '[RPCDatasource] Received SIGINT, cleaning up...')
rescue StandardError
# Logger might not be available
end
datasource.cleanup
exit(0)
end

# Handle SIGTERM (default kill signal)
Signal.trap('TERM') do
begin
ForestAdminAgent::Facades::Container.logger&.log('Info', '[RPCDatasource] Received SIGTERM, cleaning up...')
rescue StandardError
# Logger might not be available
end
datasource.cleanup
exit(0)
ForestAdminDatasourceRpc::Datasource.new(options, schema, schema_polling)
end
end
end
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
require 'openssl'
require 'json'
require 'time'
require 'digest'

module ForestAdminDatasourceRpc
module Utils
class SchemaPollingClient
attr_reader :closed
attr_reader :closed, :current_schema

DEFAULT_POLLING_INTERVAL = 600 # seconds (10 minutes)
MIN_POLLING_INTERVAL = 1 # seconds (minimum safe interval)
MAX_POLLING_INTERVAL = 3600 # seconds (1 hour max)

def initialize(uri, auth_secret, options = {}, &on_schema_change)
def initialize(uri, auth_secret, options = {}, previous_schema = nil, &on_schema_change)
@uri = uri
@auth_secret = auth_secret
@polling_interval = options[:polling_interval] || DEFAULT_POLLING_INTERVAL
@on_schema_change = on_schema_change
@closed = false
@cached_etag = nil
@current_schema = previous_schema
@cached_etag = compute_etag(previous_schema) if previous_schema
@polling_thread = nil
@mutex = Mutex.new
@connection_attempts = 0
@initial_sync_completed = false # Track if we've successfully fetched from RPC agent

# Validate polling interval
validate_polling_interval!
Expand All @@ -29,12 +32,22 @@ def initialize(uri, auth_secret, options = {}, &on_schema_change)
@rpc_client = RpcClient.new(@uri, @auth_secret)
end

def start
return if @closed
# Start schema polling with initial synchronous fetch
# The first schema fetch is done synchronously (blocking) to ensure
# the schema is available immediately. Then async polling starts.
#
# @return [Boolean] true if started successfully, false if already running or closed
def start # rubocop:disable Naming/PredicateMethod
return false if @closed

@mutex.synchronize do
return if @polling_thread&.alive?
return false if @polling_thread&.alive?

# Fetch initial schema synchronously before starting the polling thread
ForestAdminAgent::Facades::Container.logger&.log('Info', "Getting schema from RPC agent on #{@uri}.")
fetch_initial_schema_sync

# Start async polling thread
@polling_thread = Thread.new do
polling_loop
rescue StandardError => e
Expand All @@ -49,6 +62,7 @@ def start
'Info',
"[Schema Polling] Polling started (interval: #{@polling_interval}s)"
)
true
end

def stop
Expand All @@ -69,31 +83,86 @@ def stop

private

# Compute ETag from schema using same algorithm as RPC slave
# @param schema [Hash] The schema to hash
# @return [String, nil] SHA1 hexdigest of schema JSON, or nil if schema is nil
def compute_etag(schema)
return nil if schema.nil?

Digest::SHA1.hexdigest(schema.to_json)
end

# Fetch initial schema synchronously (called from start() in main thread)
# This is a blocking call that sets @current_schema and @cached_etag
def fetch_initial_schema_sync
result = @rpc_client.fetch_schema('/forest/rpc-schema')
@current_schema = result.body
@cached_etag = result.etag || compute_etag(@current_schema)
@initial_sync_completed = true # Mark initial sync as completed
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Initial schema fetched successfully (ETag: #{@cached_etag})"
)
rescue Faraday::ConnectionFailed => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Connection failed to RPC agent at #{@uri}: #{e.message}\n#{e.backtrace.join("\n")}"
)
rescue Faraday::TimeoutError => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Request timeout to RPC agent at #{@uri}: #{e.message}"
)
rescue ForestAdminAgent::Http::Exceptions::AuthenticationOpenIdClient => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Authentication failed with RPC agent at #{@uri}: #{e.message}"
)
rescue StandardError => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Failed to get schema from RPC agent at #{@uri}: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
)
end

def polling_loop
etag_status = @cached_etag ? "with ETag: #{@cached_etag}" : 'without initial ETag'
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Starting polling loop (interval: #{@polling_interval}s)"
"[Schema Polling] Starting polling loop (interval: #{@polling_interval}s, #{etag_status})"
)

first_check = true

loop do
break if @closed

# Wait before checking (skip wait on first iteration to start polling immediately)
unless first_check
etag_info = @cached_etag || 'none'
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Waiting #{@polling_interval}s before next check (current ETag: #{etag_info})"
)
sleep_with_interrupt(@polling_interval)
break if @closed
end
first_check = false

# Check for schema changes
begin
check_schema
rescue StandardError => e
handle_error(e)
end
end
end

# Sleep with interrupt check (check every second for early termination)
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Waiting #{@polling_interval}s before next check (current ETag: #{@cached_etag || "none"})"
)
remaining = @polling_interval
while remaining.positive? && !@closed
sleep([remaining, 1].min)
remaining -= 1
end
def sleep_with_interrupt(duration)
remaining = duration
while remaining.positive? && !@closed
sleep([remaining, 1].min)
remaining -= 1
end
end

Expand Down Expand Up @@ -162,22 +231,29 @@ def handle_schema_unchanged
end

def handle_schema_changed(result)
new_etag = result.etag
@cached_etag.nil? ? handle_initial_schema(new_etag) : handle_schema_update(result.body, new_etag)
new_schema = result.body
new_etag = result.etag || compute_etag(new_schema)

if !@initial_sync_completed
# First successful fetch from RPC agent - do NOT trigger callback
@cached_etag = new_etag
@current_schema = new_schema
@initial_sync_completed = true
ForestAdminAgent::Facades::Container.logger&.log(
'Info',
"[Schema Polling] Initial sync completed successfully (ETag: #{new_etag})"
)
else
# Schema update detected - trigger callback for reload
handle_schema_update(new_schema, new_etag)
end
@connection_attempts = 0
end

def handle_initial_schema(etag)
@cached_etag = etag
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Initial schema loaded successfully (ETag: #{etag})"
)
end

def handle_schema_update(schema, etag)
old_etag = @cached_etag
@cached_etag = etag
@current_schema = schema
msg = "[Schema Polling] Schema changed detected (old ETag: #{old_etag}, new ETag: #{etag}), " \
'triggering reload callback'
ForestAdminAgent::Facades::Container.logger&.log('Info', msg)
Expand Down
Loading
Loading