Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e83e465
feat(schema-polling): implement initial schema fetch and improve poll…
slimee Dec 11, 2025
109cd32
feat(schema-polling): enhance schema polling with initial synchronous…
slimee Dec 11, 2025
faa6fb3
fix(schema polling): improve initial polling behavior and logging
slimee Dec 11, 2025
967d9bd
fix(rpc): fix RuboCop offenses in schema_polling_client
slimee Dec 11, 2025
0af3bc6
chore: fix typo
slimee Dec 12, 2025
74e8835
chore: merge main into perf/improve-schema-polling
slimee Dec 12, 2025
132044b
fix(rpc): remove redundant assignment in build method
slimee Dec 12, 2025
6215deb
fix(schema-polling): move digest require to top of file
slimee Dec 12, 2025
503b38a
chore: trigger CI rerun
slimee Dec 12, 2025
408e75e
fix(schema): update polling interval option naming and improve ETag h…
slimee Dec 12, 2025
e31919d
fix(schema-polling): improve initial sync handling and error reportin…
slimee Dec 12, 2025
b65252d
refactor(schema polling): update log levels for schema sync completio…
slimee Dec 12, 2025
d54d4d2
fix(schema_polling): remove unnecessary whitespace in initial_sync_co…
slimee Dec 12, 2025
36e2b48
fix(schema polling): trigger schema update callback only after initia…
slimee Dec 12, 2025
63c3956
feat(datasource): add graceful shutdown hook for schema polling cleanup
slimee Dec 12, 2025
ac06b19
refactor(schema_polling): improve thread stopping mechanism to avoid …
slimee Dec 12, 2025
9115481
fix(schema_polling): ensure graceful shutdown of polling thread
slimee Dec 12, 2025
ab74679
fix: call only one time the get schema at start
arnaud-moncel Dec 15, 2025
4053f38
fix: call with introspection
arnaud-moncel Dec 15, 2025
f62484a
chore: improve polling
arnaud-moncel Dec 15, 2025
af3e857
fix: polling start
arnaud-moncel Dec 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,48 @@ class Error < StandardError; end
# @option options [String] :auth_secret The authentication secret (optional, will use cache if not provided)
# @option options [Integer] :schema_polling_interval Polling interval in seconds (optional)
# - Default: 600 seconds (10 minutes)
# - Can be overridden with ENV['SCHEMA_POLLING_INTERVAL']
# - Can be overridden with ENV['SCHEMA_POLLING_INTERVAL_SEC']
# - Valid range: 1-3600 seconds
# - Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default
# - Example: SCHEMA_POLLING_INTERVAL=30 for development (30 seconds)
# - Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL_SEC'] > default
# - Example: SCHEMA_POLLING_INTERVAL_SEC=30 for development (30 seconds)
#
# @return [ForestAdminDatasourceRpc::Datasource] The configured datasource with schema polling
def self.build(options)
uri = options[:uri]
auth_secret = options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret)
ForestAdminAgent::Facades::Container.logger.log('Info', "Getting schema from RPC agent on #{uri}.")

schema = nil
# Create schema polling client with configurable polling interval
# Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL_SEC'] > default (600)
polling_interval = if options[:schema_polling_interval]
options[:schema_polling_interval]
elsif ENV['SCHEMA_POLLING_INTERVAL_SEC']
ENV['SCHEMA_POLLING_INTERVAL_SEC'].to_i
else
600 # 10 minutes by default
end

begin
rpc_client = Utils::RpcClient.new(uri, auth_secret)
response = rpc_client.fetch_schema('/forest/rpc-schema')
schema = response.body
rescue Faraday::ConnectionFailed => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Connection failed to RPC agent at #{uri}: #{e.message}\n#{e.backtrace.join("\n")}"
)
rescue Faraday::TimeoutError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Request timeout to RPC agent at #{uri}: #{e.message}"
)
rescue ForestAdminAgent::Http::Exceptions::AuthenticationOpenIdClient => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Authentication failed with RPC agent at #{uri}: #{e.message}"
)
rescue StandardError => e
ForestAdminAgent::Facades::Container.logger.log(
'Error',
"Failed to get schema from RPC agent at #{uri}: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
)
polling_options = {
polling_interval: polling_interval
}

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options) do
# Callback when schema change is detected
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end

# Start polling (includes initial synchronous schema fetch)
# The initial fetch is blocking, then async polling starts
schema_polling.start

# Get the schema from the polling client
schema = schema_polling.current_schema

if schema.nil?
# return empty datasource for not breaking stack
ForestAdminDatasourceToolkit::Datasource.new
else
# Create schema polling client with configurable polling interval
# Priority: options[:schema_polling_interval] > ENV['SCHEMA_POLLING_INTERVAL'] > default (600)
polling_interval = if options[:schema_polling_interval]
options[:schema_polling_interval]
elsif ENV['SCHEMA_POLLING_INTERVAL']
ENV['SCHEMA_POLLING_INTERVAL'].to_i
else
600 # 10 minutes by default
end

polling_options = {
polling_interval: polling_interval
}

schema_polling = Utils::SchemaPollingClient.new(uri, auth_secret, polling_options) do
# Callback when schema change is detected
logger = ForestAdminAgent::Facades::Container.logger
logger.log('Info', '[RPCDatasource] Schema change detected, reloading agent...')
ForestAdminAgent::Builder::AgentFactory.instance.reload!
end
schema_polling.start

datasource = ForestAdminDatasourceRpc::Datasource.new(options, schema, schema_polling)

# Setup cleanup hooks for proper schema polling client shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module ForestAdminDatasourceRpc
module Utils
class SchemaPollingClient
attr_reader :closed
attr_reader :closed, :current_schema

DEFAULT_POLLING_INTERVAL = 600 # seconds (10 minutes)
MIN_POLLING_INTERVAL = 1 # seconds (minimum safe interval)
Expand All @@ -18,6 +18,7 @@ def initialize(uri, auth_secret, options = {}, &on_schema_change)
@on_schema_change = on_schema_change
@closed = false
@cached_etag = nil
@current_schema = nil
@polling_thread = nil
@mutex = Mutex.new
@connection_attempts = 0
Expand All @@ -29,12 +30,22 @@ def initialize(uri, auth_secret, options = {}, &on_schema_change)
@rpc_client = RpcClient.new(@uri, @auth_secret)
end

def start
return if @closed
# Start schema polling with initial synchronous fetch
# The first schema fetch is done synchronously (blocking) to ensure
# the schema is available immediately. Then async polling starts.
#
# @return [Boolean] true if started successfully, false if already running or closed
def start # rubocop:disable Naming/PredicateMethod
return false if @closed

@mutex.synchronize do
return if @polling_thread&.alive?
return false if @polling_thread&.alive?

# Fetch initial schema synchronously before starting the polling thread
ForestAdminAgent::Facades::Container.logger&.log('Info', "Getting schema from RPC agent on #{@uri}.")
fetch_initial_schema_sync

# Start async polling thread
@polling_thread = Thread.new do
polling_loop
rescue StandardError => e
Expand All @@ -49,6 +60,7 @@ def start
'Info',
"[Schema Polling] Polling started (interval: #{@polling_interval}s)"
)
true
end

def stop
Expand All @@ -69,31 +81,76 @@ def stop

private

# Fetch initial schema synchronously (called from start() in main thread)
# This is a blocking call that sets @current_schema and @cached_etag
def fetch_initial_schema_sync
result = @rpc_client.fetch_schema('/forest/rpc-schema')
@cached_etag = result.etag
@current_schema = result.body
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Initial schema fetched successfully (ETag: #{@cached_etag})"
)
rescue Faraday::ConnectionFailed => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Connection failed to RPC agent at #{@uri}: #{e.message}\n#{e.backtrace.join("\n")}"
)
rescue Faraday::TimeoutError => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Request timeout to RPC agent at #{@uri}: #{e.message}"
)
rescue ForestAdminAgent::Http::Exceptions::AuthenticationOpenIdClient => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Authentication failed with RPC agent at #{@uri}: #{e.message}"
)
rescue StandardError => e
ForestAdminAgent::Facades::Container.logger&.log(
'Error',
"Failed to get schema from RPC agent at #{@uri}: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
)
end

def polling_loop
etag_status = @cached_etag ? "with ETag: #{@cached_etag}" : 'without initial ETag'
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Starting polling loop (interval: #{@polling_interval}s)"
"[Schema Polling] Starting polling loop (interval: #{@polling_interval}s, #{etag_status})"
)

first_check = true

loop do
break if @closed

# Wait before checking (skip wait on first iteration to start polling immediately)
unless first_check
etag_info = @cached_etag || 'none'
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Waiting #{@polling_interval}s before next check (current ETag: #{etag_info})"
)
sleep_with_interrupt(@polling_interval)
break if @closed
end
first_check = false

# Check for schema changes
begin
check_schema
rescue StandardError => e
handle_error(e)
end
end
end

# Sleep with interrupt check (check every second for early termination)
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Waiting #{@polling_interval}s before next check (current ETag: #{@cached_etag || "none"})"
)
remaining = @polling_interval
while remaining.positive? && !@closed
sleep([remaining, 1].min)
remaining -= 1
end
def sleep_with_interrupt(duration)
remaining = duration
while remaining.positive? && !@closed
sleep([remaining, 1].min)
remaining -= 1
end
end

Expand Down Expand Up @@ -163,21 +220,27 @@ def handle_schema_unchanged

def handle_schema_changed(result)
new_etag = result.etag
@cached_etag.nil? ? handle_initial_schema(new_etag) : handle_schema_update(result.body, new_etag)
@connection_attempts = 0
end
new_schema = result.body

def handle_initial_schema(etag)
@cached_etag = etag
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Initial schema loaded successfully (ETag: #{etag})"
)
if @cached_etag.nil?
# Initial schema fetch
@cached_etag = new_etag
@current_schema = new_schema
ForestAdminAgent::Facades::Container.logger&.log(
'Debug',
"[Schema Polling] Initial schema loaded successfully (ETag: #{new_etag})"
)
else
# Schema update detected
handle_schema_update(new_schema, new_etag)
end
@connection_attempts = 0
end

def handle_schema_update(schema, etag)
old_etag = @cached_etag
@cached_etag = etag
@current_schema = schema
msg = "[Schema Polling] Schema changed detected (old ETag: #{old_etag}, new ETag: #{etag}), " \
'triggering reload callback'
ForestAdminAgent::Facades::Container.logger&.log('Info', msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ module ForestAdminDatasourceRpc
before do
logger = instance_double(Logger, log: nil)
allow(ForestAdminAgent::Facades::Container).to receive_messages(logger: logger, cache: 'secret')
allow(Utils::RpcClient).to receive(:new).and_return(rpc_client)
end

include_examples 'with introspection'

context 'when server is running' do
let(:schema_polling_client) { instance_double(Utils::SchemaPollingClient, start: nil, stop: nil) }
let(:schema_polling_client) { instance_double(Utils::SchemaPollingClient, start: true, stop: nil, current_schema: introspection) }
let(:response) { Utils::SchemaResponse.new(introspection, 'etag123') }
let(:rpc_client) { instance_double(Utils::RpcClient, fetch_schema: response) }

Expand All @@ -32,10 +31,11 @@ module ForestAdminDatasourceRpc
expect(datasource).to be_a(ForestAdminDatasourceRpc::Datasource)
end

it 'calls RPC client to get schema from /forest/rpc-schema' do
it 'starts schema polling which fetches the initial schema' do
described_class.build({ uri: 'http://localhost' })

expect(rpc_client).to have_received(:fetch_schema).with('/forest/rpc-schema')
expect(schema_polling_client).to have_received(:start)
expect(schema_polling_client).to have_received(:current_schema)
end

it 'creates datasource with collections from introspection' do
Expand All @@ -54,8 +54,14 @@ module ForestAdminDatasourceRpc
introspection_with_connections = introspection.merge(
native_query_connections: [{ name: 'primary' }, { name: 'secondary' }]
)
response_with_connections = Utils::SchemaResponse.new(introspection_with_connections, 'etag123')
allow(rpc_client).to receive(:fetch_schema).and_return(response_with_connections)
# Mock schema_polling_client to return introspection with connections
polling_client_with_connections = instance_double(
Utils::SchemaPollingClient,
start: true,
stop: nil,
current_schema: introspection_with_connections
)
allow(Utils::SchemaPollingClient).to receive(:new).and_return(polling_client_with_connections)

datasource = described_class.build({ uri: 'http://localhost' })

Expand All @@ -71,25 +77,23 @@ module ForestAdminDatasourceRpc
end

context 'when server is not running' do
let(:rpc_client) { instance_double(Utils::RpcClient, fetch_schema: nil) }
let(:schema_polling_client) { instance_double(Utils::SchemaPollingClient, start: true, current_schema: nil) }

it 'returns empty datasource and logs error' do
allow(rpc_client).to receive(:fetch_schema).and_raise('server not running')
before do
allow(Utils::SchemaPollingClient).to receive(:new).and_return(schema_polling_client)
end

it 'returns empty datasource when schema fetch fails' do
datasource = described_class.build({ uri: 'http://localhost' })

expect(datasource).to be_a(ForestAdminDatasourceToolkit::Datasource)
expect(ForestAdminAgent::Facades::Container.logger).to have_received(:log).with(
'Error',
a_string_matching(%r{Failed to get schema from RPC agent at http://localhost.*server not running})
)
end
end

context 'with schema polling interval configuration' do
let(:response) { Utils::SchemaResponse.new(introspection, 'etag123') }
let(:rpc_client) { instance_double(Utils::RpcClient, fetch_schema: response) }
let(:schema_polling_client) { instance_double(Utils::SchemaPollingClient, start: nil) }
let(:schema_polling_client) { instance_double(Utils::SchemaPollingClient, start: true, current_schema: introspection) }

before do
allow(Utils::RpcClient).to receive(:new).and_return(rpc_client)
Expand Down Expand Up @@ -118,8 +122,8 @@ module ForestAdminDatasourceRpc
)
end

it 'uses ENV["SCHEMA_POLLING_INTERVAL"] when set' do
ENV['SCHEMA_POLLING_INTERVAL'] = '30'
it 'uses ENV["SCHEMA_POLLING_INTERVAL_SEC"] when set' do
ENV['SCHEMA_POLLING_INTERVAL_SEC'] = '30'

described_class.build({ uri: 'http://localhost' })

Expand All @@ -130,11 +134,11 @@ module ForestAdminDatasourceRpc
any_args
)
ensure
ENV.delete('SCHEMA_POLLING_INTERVAL')
ENV.delete('SCHEMA_POLLING_INTERVAL_SEC')
end

it 'prioritizes options[:schema_polling_interval] over ENV' do
ENV['SCHEMA_POLLING_INTERVAL'] = '30'
ENV['SCHEMA_POLLING_INTERVAL_SEC'] = '30'

described_class.build({ uri: 'http://localhost', schema_polling_interval: 120 })

Expand All @@ -145,7 +149,7 @@ module ForestAdminDatasourceRpc
any_args
)
ensure
ENV.delete('SCHEMA_POLLING_INTERVAL')
ENV.delete('SCHEMA_POLLING_INTERVAL_SEC')
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ module Utils
client.stop

# Should have continued polling after failure
expect(logger).to have_received(:log).with('Warn', /Connection error/)
# Initial sync fetch failure logs as 'Error', polling loop failures log as 'Warn'
expect(logger).to have_received(:log).with('Error', /Connection failed/)
expect(logger).to have_received(:log).with('Debug', /Initial schema loaded successfully/)
end

Expand Down