Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e83e465
feat(schema-polling): implement initial schema fetch and improve poll…
slimee Dec 11, 2025
109cd32
feat(schema-polling): enhance schema polling with initial synchronous…
slimee Dec 11, 2025
faa6fb3
fix(schema polling): improve initial polling behavior and logging
slimee Dec 11, 2025
967d9bd
fix(rpc): fix RuboCop offenses in schema_polling_client
slimee Dec 11, 2025
0af3bc6
chore: fix typo
slimee Dec 12, 2025
74e8835
chore: merge main into perf/improve-schema-polling
slimee Dec 12, 2025
132044b
fix(rpc): remove redundant assignment in build method
slimee Dec 12, 2025
6215deb
fix(schema-polling): move digest require to top of file
slimee Dec 12, 2025
503b38a
chore: trigger CI rerun
slimee Dec 12, 2025
408e75e
fix(schema): update polling interval option naming and improve ETag h…
slimee Dec 12, 2025
e31919d
fix(schema-polling): improve initial sync handling and error reportin…
slimee Dec 12, 2025
b65252d
refactor(schema polling): update log levels for schema sync completio…
slimee Dec 12, 2025
d54d4d2
fix(schema_polling): remove unnecessary whitespace in initial_sync_co…
slimee Dec 12, 2025
36e2b48
fix(schema polling): trigger schema update callback only after initia…
slimee Dec 12, 2025
63c3956
feat(datasource): add graceful shutdown hook for schema polling cleanup
slimee Dec 12, 2025
ac06b19
refactor(schema_polling): improve thread stopping mechanism to avoid …
slimee Dec 12, 2025
9115481
fix(schema_polling): ensure graceful shutdown of polling thread
slimee Dec 12, 2025
ab74679
fix: call only one time the get schema at start
arnaud-moncel Dec 15, 2025
4053f38
fix: call with introspection
arnaud-moncel Dec 15, 2025
f62484a
chore: improve polling
arnaud-moncel Dec 15, 2025
af3e857
fix: polling start
arnaud-moncel Dec 16, 2025
cf38c5e
feat: shared thread pool for polling
Dec 17, 2025
409cf5f
fix: clean up
Dec 17, 2025
162284b
fix: add options for thread pool
Dec 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 22 additions & 118 deletions packages/forest_admin_datasource_rpc/lib/forest_admin_datasource_rpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,131 +8,35 @@
module ForestAdminDatasourceRpc
class Error < StandardError; end

# Build a RPC datasource with schema polling enabled.
#
# @param options [Hash] Configuration options
# @option options [String] :uri The URI of the RPC agent
# @option options [String] :auth_secret The authentication secret (optional, will use cache if not provided)
# @option options [Integer] :schema_polling_interval Polling interval in seconds (optional)
# - Default: 600 seconds (10 minutes)
# - Can be overridden with ENV['SCHEMA_POLLING_INTERVAL']
# - Valid range: 1-3600 seconds
# - Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default
# - Example: SCHEMA_POLLING_INTERVAL=30 for development (30 seconds)
# @option options [Hash] :introspection Pre-defined schema introspection for resilient deployment
# - When provided, allows the datasource to start even if the RPC slave is unreachable
# - The introspection will be used as fallback when the slave connection fails
# - Schema polling will still be enabled to pick up changes when the slave becomes available
#
# @return [ForestAdminDatasourceRpc::Datasource] The configured datasource with schema polling
def self.build(options)
uri = options[:uri]
auth_secret = options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret)
provided_introspection = options[:introspection]
ForestAdminAgent::Facades::Container.logger.log('Info', "Getting schema from RPC agent on #{uri}.")

schema = nil

begin
rpc_client = Utils::RpcClient.new(uri, auth_secret)
response = rpc_client.fetch_schema('/forest/rpc-schema')
schema = response.body
rescue Faraday::ConnectionFailed => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Connection failed to RPC agent at #{uri}: #{e.message}\n#{e.backtrace.join("\n")}"
)
rescue Faraday::TimeoutError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Request timeout to RPC agent at #{uri}: #{e.message}"
)
rescue ForestAdminAgent::Http::Exceptions::AuthenticationOpenIdClient => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Authentication failed with RPC agent at #{uri}: #{e.message}"
)
rescue StandardError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Failed to get schema from RPC agent at #{uri}: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
)
end

# Use provided introspection as fallback when slave is unreachable
if schema.nil? && provided_introspection
ForestAdminAgent::Facades::Container.logger.log(
'Warn',
"RPC agent at #{uri} is unreachable, using provided introspection for resilient deployment."
)
options.delete(:introspection)
schema = provided_introspection
polling_interval = if options[:schema_polling_interval_sec]
options[:schema_polling_interval_sec]
elsif ENV['SCHEMA_POLLING_INTERVAL_SEC']
ENV['SCHEMA_POLLING_INTERVAL_SEC'].to_i
else
600
end

polling_options = {
polling_interval: polling_interval
}

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options, provided_introspection) do
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end

if schema.nil?
# return empty datasource for not breaking stack
ForestAdminDatasourceToolkit::Datasource.new
else
# Create schema polling client with configurable polling interval
# Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default (600)
polling_interval = if options[:schema_polling_interval]
options[:schema_polling_interval]
elsif ENV['SCHEMA_POLLING_INTERVAL']
ENV['SCHEMA_POLLING_INTERVAL'].to_i
else
600 # 10 minutes by default
end

polling_options = {
polling_interval: polling_interval
}
# Start polling (includes initial synchronous schema fetch)
# - Without introspection: crashes if RPC is unreachable
# - With introspection: falls back to introspection if RPC is unreachable
schema_polling.start

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options) do
# Callback when schema change is detected
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end
schema_polling.start

datasource = ForestAdminDatasourceRpc::Datasource.new(options, schema, schema_polling)

# Setup cleanup hooks for proper schema polling client shutdown
setup_cleanup_hooks(datasource)

datasource
end
end

def self.setup_cleanup_hooks(datasource)
# Register cleanup handler for graceful shutdown
at_exit do
datasource.cleanup
rescue StandardError => e
# Silently ignore errors during exit cleanup to prevent test pollution
warn "[RPCDatasource] Error during at_exit cleanup: #{e.message}" if $VERBOSE
end

# Handle SIGINT (Ctrl+C)
Signal.trap('INT') do
begin
ForestAdminAgent::Facades::Container.logger&.log('Info', '[RPCDatasource] Received SIGINT, cleaning up...')
rescue StandardError
# Logger might not be available
end
datasource.cleanup
exit(0)
end

# Handle SIGTERM (default kill signal)
Signal.trap('TERM') do
begin
ForestAdminAgent::Facades::Container.logger&.log('Info', '[RPCDatasource] Received SIGTERM, cleaning up...')
rescue StandardError
# Logger might not be available
end
datasource.cleanup
exit(0)
end
@provided_introspection = nil
ForestAdminDatasourceRpc::Datasource.new(options, schema_polling.current_schema, schema_polling)
end
end
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
require 'openssl'
require 'json'
require 'time'
require 'digest'

module ForestAdminDatasourceRpc
module Utils
class SchemaPollingClient
attr_reader :closed
attr_reader :closed, :current_schema

DEFAULT_POLLING_INTERVAL = 600 # seconds (10 minutes)
MIN_POLLING_INTERVAL = 1 # seconds (minimum safe interval)
MAX_POLLING_INTERVAL = 3600 # seconds (1 hour max)
DEFAULT_POLLING_INTERVAL = 600
MIN_POLLING_INTERVAL = 1
MAX_POLLING_INTERVAL = 3600

def initialize(uri, auth_secret, options = {}, &on_schema_change)
def initialize(uri, auth_secret, options = {}, introspection_schema = nil, &on_schema_change)
@uri = uri
@auth_secret = auth_secret
@polling_interval = options[:polling_interval] || DEFAULT_POLLING_INTERVAL
@on_schema_change = on_schema_change
@closed = false
@introspection_schema = introspection_schema
@current_schema = nil
@cached_etag = nil
@polling_thread = nil
@mutex = Mutex.new
@connection_attempts = 0
@initial_sync_completed = false

# Validate polling interval
validate_polling_interval!

# RPC client for schema fetching with ETag support
@rpc_client = RpcClient.new(@uri, @auth_secret)
end

def start
return if @closed
def start # rubocop:disable Naming/PredicateMethod
return false if @closed

@mutex.synchronize do
return if @polling_thread&.alive?
return false if @polling_thread&.alive?

# Always try to fetch initial schema from RPC agent
ForestAdminAgent::Facades::Container.logger&.log('Info', "Getting schema from RPC agent on #{@uri}.")
fetch_initial_schema_sync

# Start async polling thread
@polling_thread = Thread.new do
polling_loop
rescue StandardError => e
Expand All @@ -49,6 +56,7 @@ def start
'Info',
"[Schema Polling] Polling started (interval: #{@polling_interval}s)"
)
true
end

def stop
Expand All @@ -57,43 +65,104 @@ def stop
@closed = true
ForestAdminAgent::Facades::Container.logger&.log('Debug', '[Schema Polling] Stopping polling')

@mutex.synchronize do
if @polling_thread&.alive?
@polling_thread.kill
@polling_thread = nil
end
end
@polling_thread.join(2) if @polling_thread&.alive?
@polling_thread = nil

ForestAdminAgent::Facades::Container.logger&.log('Debug', '[Schema Polling] Polling stopped')
end

private

def compute_etag(schema)
return nil if schema.nil?

Digest::SHA1.hexdigest(schema.to_json)
end

def fetch_initial_schema_sync
# If we have an introspection schema, send its ETag to avoid re-downloading unchanged schema
introspection_etag = compute_etag(@introspection_schema) if @introspection_schema
result = @rpc_client.fetch_schema('/forest/rpc-schema', if_none_match: introspection_etag)

if result == RpcClient::NotModified
# Schema unchanged from introspection - use introspection
@current_schema = @introspection_schema
@cached_etag = introspection_etag
@initial_sync_completed = true
ForestAdminAgent::Facades::Container.logger&.log(
'Info',
"[Schema Polling] RPC schema unchanged (HTTP 304), using introspection (ETag: #{@cached_etag})"
)
else
# New schema from RPC
@current_schema = result.body
@cached_etag = result.etag || compute_etag(@current_schema)
@initial_sync_completed = true
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Initial schema fetched successfully (ETag: #{@cached_etag})"
)
end

@introspection_schema = nil
rescue Faraday::ConnectionFailed, Faraday::TimeoutError,
ForestAdminAgent::Http::Exceptions::AuthenticationOpenIdClient, StandardError => e
handle_initial_fetch_error(e)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function with high complexity (count = 5): fetch_initial_schema_sync [qlty:function-complexity]

end

def handle_initial_fetch_error(error)
if @introspection_schema
# Fallback to introspection schema - don't crash
@current_schema = @introspection_schema
@cached_etag = compute_etag(@current_schema)
@introspection_schema = nil
@initial_sync_completed = true
ForestAdminAgent::Facades::Container.logger&.log(
'Warn',
"RPC agent at #{@uri} is unreachable (#{error.class}: #{error.message}), " \
"using provided introspection schema (ETag: #{@cached_etag})"
)
else
# No introspection - re-raise to crash
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Failed to get schema from RPC agent at #{@uri}: #{error.class} - #{error.message}"
)
raise error
end
end

def polling_loop
etag_status = @cached_etag ? "with ETag: #{@cached_etag}" : 'without initial ETag'
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Starting polling loop (interval: #{@polling_interval}s)"
"[Schema Polling] Starting polling loop (interval: #{@polling_interval}s, #{etag_status})"
)

loop do
break if @closed

etag_info = @cached_etag || 'none'
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Waiting #{@polling_interval}s before next check (current ETag: #{etag_info})"
)
sleep_with_interrupt(@polling_interval)
break if @closed

begin
check_schema
rescue StandardError => e
handle_error(e)
end
end
end

# Sleep with interrupt check (check every second for early termination)
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Waiting #{@polling_interval}s before next check (current ETag: #{@cached_etag || "none"})"
)
remaining = @polling_interval
while remaining.positive? && !@closed
sleep([remaining, 1].min)
remaining -= 1
end
def sleep_with_interrupt(duration)
remaining = duration
while remaining.positive? && !@closed
sleep([remaining, 1].min)
remaining -= 1
end
end

Expand Down Expand Up @@ -162,22 +231,27 @@ def handle_schema_unchanged
end

def handle_schema_changed(result)
new_etag = result.etag
@cached_etag.nil? ? handle_initial_schema(new_etag) : handle_schema_update(result.body, new_etag)
@connection_attempts = 0
end
new_schema = result.body
new_etag = result.etag || compute_etag(new_schema)

def handle_initial_schema(etag)
@cached_etag = etag
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Initial schema loaded successfully (ETag: #{etag})"
)
if @initial_sync_completed
handle_schema_update(new_schema, new_etag)
else
@cached_etag = new_etag
@current_schema = new_schema
@initial_sync_completed = true
ForestAdminAgent::Facades::Container.logger&.log(
'Info',
"[Schema Polling] Initial sync completed successfully (ETag: #{new_etag})"
)
end
@connection_attempts = 0
end

def handle_schema_update(schema, etag)
old_etag = @cached_etag
@cached_etag = etag
@current_schema = schema
msg = "[Schema Polling] Schema changed detected (old ETag: #{old_etag}, new ETag: #{etag}), " \
'triggering reload callback'
ForestAdminAgent::Facades::Container.logger&.log('Info', msg)
Expand Down
Loading
Loading