Skip to content

Commit

Permalink
feat(events): Stream events to kafka (getlago#1411)
Browse files Browse the repository at this point in the history
* feat(events): Stream events to kafka

* fix pronto
  • Loading branch information
jdenquin authored Oct 25, 2023
1 parent 9eb9e92 commit 4aae17b
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ gem 'google-cloud-storage', require: false
gem 'slim'
gem 'slim-rails'

# Kafka
gem 'karafka'

group :development, :test, :staging do
gem 'factory_bot_rails'
gem 'faker'
Expand Down
16 changes: 16 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ GEM
activerecord
kaminari-core (= 1.2.2)
kaminari-core (1.2.2)
karafka (2.2.9)
karafka-core (>= 2.2.2, < 2.3.0)
waterdrop (>= 2.6.10, < 3.0.0)
zeitwerk (~> 2.3)
karafka-core (2.2.3)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.13.6, < 0.14.0)
karafka-rdkafka (0.13.6)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
lograge (0.13.0)
actionpack (>= 4)
activesupport (>= 4)
Expand All @@ -281,6 +292,7 @@ GEM
memoist (0.16.2)
method_source (1.0.0)
mini_mime (1.1.2)
mini_portile2 (2.8.5)
minitest (5.18.0)
monetize (1.12.0)
money (~> 6.12)
Expand Down Expand Up @@ -487,6 +499,9 @@ GEM
execjs (>= 0.3.0, < 3)
unicode-display_width (2.5.0)
version_gem (1.1.1)
waterdrop (2.6.10)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)
webmock (3.14.0)
addressable (>= 2.8.0)
crack (>= 0.3.2)
Expand Down Expand Up @@ -536,6 +551,7 @@ DEPENDENCIES
i18n-tasks!
jwt
kaminari-activerecord
karafka
lograge
lograge-sql
logstash-event
Expand Down
18 changes: 18 additions & 0 deletions app/services/events/create_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def call

result.event = event

produce_kafka_event(event)
Events::PostProcessJob.perform_later(event:)

result
Expand All @@ -36,5 +37,22 @@ def call
private

attr_reader :organization, :params, :timestamp, :metadata

def produce_kafka_event(event)
return if ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'].blank?

Karafka.producer.produce_sync(
topic: 'events-raw',
payload: {
organization_id: organization.id,
external_customer_id: event.external_customer_id,
external_subscription_id: event.external_subscription_id,
transaction_id: event.transaction_id,
timestamp: event.timestamp,
code: event.code,
properties: event.properties,
}.to_json,
)
end
end
end
30 changes: 30 additions & 0 deletions karafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

class KarafkaApp < Karafka::App
setup do |config|
config.kafka = { 'bootstrap.servers': ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'] }
config.client_id = 'Lago'
# Recreate consumers with each batch. This will allow Rails code reload to work in the
# development mode. Otherwise Karafka process would not be aware of code changes
config.consumer_persistence = !Rails.env.development?
end

# Comment out this part if you are not using instrumentation and/or you are not
# interested in logging events for certain environments. Since instrumentation
# notifications add extra boilerplate, if you want to achieve max performance,
# listen to only what you really need for given environment.
Karafka.monitor.subscribe(Karafka::Instrumentation::LoggerListener.new)
# Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new)

# This logger prints the producer development info using the Karafka logger.
# It is similar to the consumer logger listener but producer oriented.
Karafka.producer.monitor.subscribe(
WaterDrop::Instrumentation::LoggerListener.new(
# Log producer operations using the Karafka logger
Karafka.logger,
# If you set this to true, logs will contain each message details
# Please note, that this can be extensive
log_messages: false,
),
)
end
17 changes: 17 additions & 0 deletions spec/services/events/create_service_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,22 @@
expect(result.event.timestamp.iso8601(3)).to eq('2023-09-04T15:45:12.344Z')
end
end

context 'when kafka is configured' do
let(:karafka_producer) { instance_double(WaterDrop::Producer) }

before do
ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'] = 'kafka'
end

it 'produces the event on kafka' do
allow(Karafka).to receive(:producer).and_return(karafka_producer)
allow(karafka_producer).to receive(:produce_sync)

create_service.call

expect(karafka_producer).to have_received(:produce_sync)
end
end
end
end

0 comments on commit 4aae17b

Please sign in to comment.