Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ def self.healthy?
Shoryuken::Runner.instance.healthy?
end

# Returns the global instrumentation monitor.
# Use this to subscribe to Shoryuken lifecycle events.
#
# @return [Shoryuken::Instrumentation::Notifications] the monitor instance
#
# @example Subscribe to message processing events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
# @example Subscribe to all events
# Shoryuken.monitor.subscribe do |event|
# logger.info("Event: #{event.name}")
# end
def self.monitor
@_monitor ||= Instrumentation::Notifications.new
end

# Resets the monitor instance (useful for testing)
#
# @return [void]
# @api private
def self.reset_monitor!
@_monitor = nil
end

def_delegators(
:shoryuken_options,
:active_job?,
Expand Down
14 changes: 10 additions & 4 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ def fetch(queue, limit)
fetch_with_auto_retry(3) do
started_at = Time.now

logger.debug { "Looking for new messages in #{queue}" }
Shoryuken.monitor.publish('fetcher.started', queue: queue.name, limit: limit)

sqs_msgs = Array(receive_messages(queue, limit))

logger.debug { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty?
logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" }
Shoryuken.monitor.publish('fetcher.completed',
queue: queue.name,
message_count: sqs_msgs.size,
duration_ms: elapsed(started_at))

sqs_msgs
end
Expand All @@ -54,7 +56,11 @@ def fetch_with_auto_retry(max_attempts)

attempts += 1

logger.debug { "Retrying fetch attempt #{attempts} for #{e.message}" }
Shoryuken.monitor.publish('fetcher.retry',
attempt: attempts,
max_attempts: max_attempts,
error_message: e.message,
error_class: e.class.name)

sleep((1..5).to_a.sample)

Expand Down
18 changes: 18 additions & 0 deletions lib/shoryuken/instrumentation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

require_relative 'instrumentation/event'
require_relative 'instrumentation/notifications'
require_relative 'instrumentation/logger_listener'

module Shoryuken
# Instrumentation module providing pub/sub event notifications.
# Inspired by Karafka's instrumentation architecture.
#
# @example Subscribing to events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
module Instrumentation
end
end
62 changes: 62 additions & 0 deletions lib/shoryuken/instrumentation/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Represents an instrumentation event with metadata.
# Events are published through the Notifications system and contain
# information about what happened, when, and relevant context.
#
# @example Creating an event
# event = Event.new('message.processed', queue: 'default', duration: 0.5)
# event.name # => 'message.processed'
# event[:queue] # => 'default'
# event.duration # => 0.5
#
class Event
# @return [String] the event name (e.g., 'message.processed')
attr_reader :name

# @return [Hash] the event payload containing contextual data
attr_reader :payload

# @return [Time] when the event was created
attr_reader :time

# Creates a new Event instance
#
# @param name [String] the event name using dot notation (e.g., 'message.processed')
# @param payload [Hash] contextual data for the event
def initialize(name, payload = {})
@name = name
@payload = payload
@time = Time.now
end

# Accesses a value from the payload by key
#
# @param key [Symbol, String] the payload key
# @return [Object, nil] the value or nil if not found
def [](key)
payload[key]
end

# Returns the duration from the payload if present
#
# @return [Float, nil] the duration in seconds or nil
def duration
payload[:duration]
end

# Returns a hash representation of the event
#
# @return [Hash] the event as a hash
def to_h
{
name: name,
payload: payload,
time: time
}
end
end
end
end
143 changes: 143 additions & 0 deletions lib/shoryuken/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Default listener that logs instrumentation events.
# This provides human-readable log output for key Shoryuken events.
#
# @example Subscribing the logger listener
# Shoryuken.monitor.subscribe(&LoggerListener.new.method(:call))
#
class LoggerListener
# Creates a new LoggerListener
#
# @param logger [Logger] the logger to use (defaults to Shoryuken.logger)
def initialize(logger = nil)
@logger = logger
end

# Returns the logger instance
#
# @return [Logger] the logger
def logger
@logger || Shoryuken.logger
end

# Handles an instrumentation event by logging it appropriately
#
# @param event [Event] the event to handle
# @return [void]
def call(event)
method_name = "on_#{event.name.tr('.', '_')}"
send(method_name, event) if respond_to?(method_name, true)
end

private

# App lifecycle events

def on_app_started(event)
groups = event[:groups] || []
logger.info { "Shoryuken started with #{groups.size} group(s)" }
end

def on_app_stopping(_event)
logger.info { 'Shoryuken shutting down...' }
end

def on_app_stopped(_event)
logger.info { 'Shoryuken stopped' }
end

def on_app_quiet(_event)
logger.info { 'Shoryuken is quiet' }
end

# Fetcher events

def on_fetcher_started(event)
logger.debug { "Looking for new messages in #{event[:queue]}" }
end

def on_fetcher_completed(event)
queue = event[:queue]
message_count = event[:message_count] || 0
duration_ms = event[:duration_ms]

logger.debug { "Found #{message_count} messages for #{queue}" } if message_count.positive?
logger.debug { "Fetcher for #{queue} completed in #{duration_ms} ms" }
end

def on_fetcher_retry(event)
logger.debug { "Retrying fetch attempt #{event[:attempt]} for #{event[:error_message]}" }
end

# Manager events

def on_manager_dispatch(event)
logger.debug do
"Ready: #{event[:ready]}, Busy: #{event[:busy]}, Active Queues: #{event[:active_queues]}"
end
end

def on_manager_processor_assigned(event)
logger.debug { "Assigning #{event[:message_id]}" }
end

def on_manager_failed(event)
logger.error { "Manager failed: #{event[:error_message]}" }
logger.error { event[:backtrace].join("\n") } if event[:backtrace]
end

# Message processing events

def on_message_processed(event)
# Skip logging if there was an exception - error.occurred handles that
return if event[:exception]

duration_ms = event.duration ? (event.duration * 1000).round(2) : 0
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'

logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" }
end

def on_message_failed(event)
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'
error = event[:error]
error_message = error.respond_to?(:message) ? error.message : error.to_s

logger.error { "Failed #{worker}/#{queue}: #{error_message}" }
end

# Error events

def on_error_occurred(event)
error = event[:error]
error_class = error.respond_to?(:class) ? error.class.name : 'Unknown'
error_message = error.respond_to?(:message) ? error.message : error.to_s
type = event[:type]

if type
logger.error { "Error in #{type}: #{error_class} - #{error_message}" }
else
logger.error { "Error occurred: #{error_class} - #{error_message}" }
end

logger.error { error.backtrace.join("\n") } if error.respond_to?(:backtrace) && error.backtrace
end

# Queue events

def on_queue_polling(event)
queue = event[:queue] || 'Unknown'
logger.debug { "Polling queue: #{queue}" }
end

def on_queue_empty(event)
logger.debug { "Queue #{event[:queue]} is empty" }
end
end
end
end
Loading
Loading