Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e83e465
feat(schema-polling): implement initial schema fetch and improve poll…
slimee Dec 11, 2025
109cd32
feat(schema-polling): enhance schema polling with initial synchronous…
slimee Dec 11, 2025
faa6fb3
fix(schema polling): improve initial polling behavior and logging
slimee Dec 11, 2025
967d9bd
fix(rpc): fix RuboCop offenses in schema_polling_client
slimee Dec 11, 2025
0af3bc6
chore: fix typo
slimee Dec 12, 2025
74e8835
chore: merge main into perf/improve-schema-polling
slimee Dec 12, 2025
132044b
fix(rpc): remove redundant assignment in build method
slimee Dec 12, 2025
6215deb
fix(schema-polling): move digest require to top of file
slimee Dec 12, 2025
503b38a
chore: trigger CI rerun
slimee Dec 12, 2025
408e75e
fix(schema): update polling interval option naming and improve ETag h…
slimee Dec 12, 2025
e31919d
fix(schema-polling): improve initial sync handling and error reportin…
slimee Dec 12, 2025
b65252d
refactor(schema polling): update log levels for schema sync completio…
slimee Dec 12, 2025
d54d4d2
fix(schema_polling): remove unnecessary whitespace in initial_sync_co…
slimee Dec 12, 2025
36e2b48
fix(schema polling): trigger schema update callback only after initia…
slimee Dec 12, 2025
63c3956
feat(datasource): add graceful shutdown hook for schema polling cleanup
slimee Dec 12, 2025
ac06b19
refactor(schema_polling): improve thread stopping mechanism to avoid …
slimee Dec 12, 2025
9115481
fix(schema_polling): ensure graceful shutdown of polling thread
slimee Dec 12, 2025
ab74679
fix: call only one time the get schema at start
arnaud-moncel Dec 15, 2025
4053f38
fix: call with introspection
arnaud-moncel Dec 15, 2025
f62484a
chore: improve polling
arnaud-moncel Dec 15, 2025
af3e857
fix: polling start
arnaud-moncel Dec 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 22 additions & 118 deletions packages/forest_admin_datasource_rpc/lib/forest_admin_datasource_rpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,131 +8,35 @@
module ForestAdminDatasourceRpc
class Error < StandardError; end

# Build a RPC datasource with schema polling enabled.
#
# @param options [Hash] Configuration options
# @option options [String] :uri The URI of the RPC agent
# @option options [String] :auth_secret The authentication secret (optional, will use cache if not provided)
# @option options [Integer] :schema_polling_interval Polling interval in seconds (optional)
# - Default: 600 seconds (10 minutes)
# - Can be overridden with ENV['SCHEMA_POLLING_INTERVAL']
# - Valid range: 1-3600 seconds
# - Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default
# - Example: SCHEMA_POLLING_INTERVAL=30 for development (30 seconds)
# @option options [Hash] :introspection Pre-defined schema introspection for resilient deployment
# - When provided, allows the datasource to start even if the RPC slave is unreachable
# - The introspection will be used as fallback when the slave connection fails
# - Schema polling will still be enabled to pick up changes when the slave becomes available
#
# @return [ForestAdminDatasourceRpc::Datasource] The configured datasource with schema polling
def self.build(options)
uri = options[:uri]
auth_secret = options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret)
provided_introspection = options[:introspection]
ForestAdminAgent::Facades::Container.logger.log('Info', "Getting schema from RPC agent on #{uri}.")

schema = nil

begin
rpc_client = Utils::RpcClient.new(uri, auth_secret)
response = rpc_client.fetch_schema('/forest/rpc-schema')
schema = response.body
rescue Faraday::ConnectionFailed => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Connection failed to RPC agent at #{uri}: #{e.message}\n#{e.backtrace.join("\n")}"
)
rescue Faraday::TimeoutError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Request timeout to RPC agent at #{uri}: #{e.message}"
)
rescue ForestAdminAgent::Http::Exceptions::AuthenticationOpenIdClient => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Authentication failed with RPC agent at #{uri}: #{e.message}"
)
rescue StandardError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Failed to get schema from RPC agent at #{uri}: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
)
end

# Use provided introspection as fallback when slave is unreachable
if schema.nil? && provided_introspection
ForestAdminAgent::Facades::Container.logger.log(
'Warn',
"RPC agent at #{uri} is unreachable, using provided introspection for resilient deployment."
)
options.delete(:introspection)
schema = provided_introspection
polling_interval = if options[:schema_polling_interval_sec]
options[:schema_polling_interval_sec]
elsif ENV['SCHEMA_POLLING_INTERVAL_SEC']
ENV['SCHEMA_POLLING_INTERVAL_SEC'].to_i
else
600
end

polling_options = {
polling_interval: polling_interval
}

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options, provided_introspection) do
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end

if schema.nil?
# return empty datasource for not breaking stack
ForestAdminDatasourceToolkit::Datasource.new
else
# Create schema polling client with configurable polling interval
# Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default (600)
polling_interval = if options[:schema_polling_interval]
options[:schema_polling_interval]
elsif ENV['SCHEMA_POLLING_INTERVAL']
ENV['SCHEMA_POLLING_INTERVAL'].to_i
else
600 # 10 minutes by default
end

polling_options = {
polling_interval: polling_interval
}
# Start polling (includes initial synchronous schema fetch)
# - Without introspection: crashes if RPC is unreachable
# - With introspection: falls back to introspection if RPC is unreachable
schema_polling.start

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options) do
# Callback when schema change is detected
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end
schema_polling.start

datasource = ForestAdminDatasourceRpc::Datasource.new(options, schema, schema_polling)

# Setup cleanup hooks for proper schema polling client shutdown
setup_cleanup_hooks(datasource)

datasource
end
end

def self.setup_cleanup_hooks(datasource)
# Register cleanup handler for graceful shutdown
at_exit do
datasource.cleanup
rescue StandardError => e
# Silently ignore errors during exit cleanup to prevent test pollution
warn "[RPCDatasource] Error during at_exit cleanup: #{e.message}" if $VERBOSE
end

# Handle SIGINT (Ctrl+C)
Signal.trap('INT') do
begin
ForestAdminAgent::Facades::Container.logger&.log('Info', '[RPCDatasource] Received SIGINT, cleaning up...')
rescue StandardError
# Logger might not be available
end
datasource.cleanup
exit(0)
end

# Handle SIGTERM (default kill signal)
Signal.trap('TERM') do
begin
ForestAdminAgent::Facades::Container.logger&.log('Info', '[RPCDatasource] Received SIGTERM, cleaning up...')
rescue StandardError
# Logger might not be available
end
datasource.cleanup
exit(0)
end
@provided_introspection = nil
ForestAdminDatasourceRpc::Datasource.new(options, schema_polling.current_schema, schema_polling)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def fetch_schema(endpoint, if_none_match: nil)

private

DEFAULT_TIMEOUT = 30 # seconds
DEFAULT_OPEN_TIMEOUT = 10 # seconds

# rubocop:disable Metrics/ParameterLists
def make_request(endpoint, caller: nil, method: :get, payload: nil, symbolize_keys: false, if_none_match: nil)
log_request_start(method, endpoint, if_none_match)
Expand All @@ -63,6 +66,8 @@ def make_request(endpoint, caller: nil, method: :get, payload: nil, symbolize_ke
faraday.response :json, parser_options: { symbolize_names: symbolize_keys }
faraday.adapter Faraday.default_adapter
faraday.ssl.verify = !ForestAdminAgent::Facades::Container.cache(:debug)
faraday.options.timeout = DEFAULT_TIMEOUT
faraday.options.open_timeout = DEFAULT_OPEN_TIMEOUT
end

timestamp = Time.now.utc.iso8601(3)
Expand Down
Loading
Loading