Skip to content

Commit 7cc4283

Browse files
committed
🎉 introduce :filter and :update_expressions
This commit introduces :filter (instead of :query-key and :query-value) for defining the query that we pass down to Mongo. It is a hash and will allow us to filter by a composite key. In addition to that :update_expressions is now another optional hash that can be added to *replace* the default $set operator. The hash is a set of Mongo Update Expressions (https://docs.mongodb.com/manual/reference/operator/update/#id1) and the values are also substituted. Note that pipeline (Mongo >= 4.2) support is not there yet. Finally, action is now fully dynamic and expanded via sprintf as in logstash-output-elasticsearch.
1 parent 9d977cb commit 7cc4283

12 files changed

+325
-121
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
Gemfile.lock
33
.bundle
44
vendor
5+
logs

README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
# Logstash Plugin
1+
# Logstash Mongo Output Plugin
22

3-
[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-output-mongodb.svg)](https://travis-ci.org/logstash-plugins/logstash-output-mongodb)
3+
---
4+
This is a fork of [logstash-plugins/logstash-output-mongodb](https://github.com/logstash-plugins/logstash-output-mongodb).
5+
6+
It adds the :action, :filter, :update_expressions and :upsert parameters
7+
---
48

59
This is a plugin for [Logstash](https://github.com/elastic/logstash).
610

@@ -21,20 +25,17 @@ Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/log
2125

2226
### 1. Plugin Developement and Testing
2327

24-
#### Code
25-
- To get started, you'll need JRuby with the Bundler gem installed.
28+
For developing this plugin we use the wonderful work of [cameronkerrnz/logstash-plugin-dev](https://github.com/cameronkerrnz/logstash-plugin-dev):
2629

27-
- Create a new plugin or clone and existing from the GitHub [logstash-plugins](https://github.com/logstash-plugins) organization. We also provide [example plugins](https://github.com/logstash-plugins?query=example).
30+
To start an interactive environment run:
2831

29-
- Install dependencies
30-
```sh
31-
bundle install
32+
``` sh
33+
docker run --rm -it -v ${PWD}:/work cameronkerrnz/logstash-plugin-dev:7.9
3234
```
3335

34-
#### Test
35-
36-
- Update your dependencies
36+
After that you can run the usual suspects:
3737

38+
- Install/Update dependencies
3839
```sh
3940
bundle install
4041
```
@@ -95,4 +96,4 @@ Programming is not a required skill. Whatever you've seen about open source and
9596

9697
It is more important to the community that you are able to contribute.
9798

98-
For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file.
99+
For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file.

lib/logstash/outputs/mongodb.rb

Lines changed: 121 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,55 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
3434
# "_id" field in the event.
3535
config :generateId, :validate => :boolean, :default => false
3636

37-
3837
# Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one.
3938
config :bulk, :validate => :boolean, :default => false
39+
4040
# Bulk interval, Used to insert events periodically if the "bulk" flag is activated.
4141
config :bulk_interval, :validate => :number, :default => 2
42+
4243
# Bulk events number, if the number of events to insert into a collection raise that limit, it will be bulk inserted
4344
# whatever the bulk interval value (mongodb hard limit is 1000).
4445
config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2
4546

46-
# The method used to write processed events to MongoDB.
47-
# Possible values are `insert`, `update` and `replace`.
48-
config :action, :validate => :string, :required => true
49-
# The key of the query to find the document to update or replace.
50-
config :query_key, :validate => :string, :required => false, :default => "_id"
51-
# The value of the query to find the document to update or replace. This can be dynamic using the `%{foo}` syntax.
52-
config :query_value, :validate => :string, :required => false
47+
# The Mongo DB action to perform. Valid actions are:
48+
#
49+
# - insert: inserts a document, fails if a document the document already exists.
50+
# - update: updates a document given a `filter`. You can also upsert a document, see the `upsert` option.
51+
# - delete: *Not Supported* at the moment
52+
#
53+
# A sprintf-able string is allowed to change the action based on the content
54+
# of the event. The value `%{[foo]}` would use the foo field for the action.
55+
#
56+
# For more details on actions, check out the https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[Mongo Ruby Driver documentation]
57+
config :action, :validate => :string, :default => "insert"
58+
59+
# The :filter clause for an update or replace.
60+
#
61+
# A sprintf-able string is allowed for keys: the value `my_%{[foo]}` would
62+
# use the foo field instead *always coerced to a string*.
63+
#
64+
# Hovewever, the
65+
# https://www.elastic.co/guide/en/logstash/current/field-references-deepdive.html[Field
66+
# Reference Syntax] is required for values - these preserve type (integer,
67+
# float, ...).
68+
config :filter, :validate => :hash, :required => false, :default => {}
69+
70+
# The hash in :update_expressions will be used *instead* of the default
71+
# '$set'. This option is useful for using alternative operators like '$inc'.
72+
#
73+
# A sprintf-able string is allowed for keys: the value `my_%{[foo]}` would
74+
# use the foo field instead *always coerced to a string*.
75+
#
76+
# Hovewever, the
77+
# https://www.elastic.co/guide/en/logstash/current/field-references-deepdive.html[Field
78+
# Reference Syntax] is required for values - these preserve type (integer,
79+
# float, ...).
80+
#
81+
# Keys must start with `$`, see the https://docs.mongodb.com/manual/reference/operator/update/#id1[Mongo DB Update Operators] for reference.
82+
#
83+
# Note that pipeline support (Mongo >= 4.2) is not there yet.
84+
config :update_expressions, :validate => :hash, :required => false, :default => nil
85+
5386
# If true, a new document is created if no document exists in DB with given `document_id`.
5487
# Only applies if action is `update` or `replace`.
5588
config :upsert, :validate => :boolean, :required => false, :default => false
@@ -88,20 +121,37 @@ def validate_config
88121
if @bulk_size > 1000
89122
raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
90123
end
91-
if @action != "insert" && @action != "update" && @action != "replace"
92-
raise LogStash::ConfigurationError, "Only insert, update and replace are valid for 'action' setting."
124+
if !@update_expressions.nil?
125+
@update_expressions.keys.each { |k|
126+
if !is_update_operator(k)
127+
raise LogStash::ConfigurationError, "The :update_expressions option contains '#{k}', which is not an Update expression."
128+
break
129+
end
130+
}
93131
end
94-
if (@action == "update" || @action == "replace") && (@query_value.nil? || @query_value.empty?)
95-
raise LogStash::ConfigurationError, "If action is update or replace, query_value must be set."
132+
end
133+
134+
def validate_action(action, filter, update_expressions)
135+
if action != "insert" && action != "update" && action != "replace"
136+
raise LogStash::ConfigurationError, "Only insert, update and replace are supported Mongo actions, got '#{action}'."
137+
end
138+
if (action == "update" || action == "replace") && (filter.nil? || filter.empty?)
139+
raise LogStash::ConfigurationError, "If action is update or replace, filter must be set."
140+
end
141+
if action != "update" && !(update_expressions.nil? || update_expressions.empty?)
142+
raise LogStash::ConfigurationError, "The :update_expressions only makes sense if the action is an update."
96143
end
97144
end
98145

99146
def receive(event)
147+
action = event.sprintf(@action)
148+
149+
validate_action(action, @filter, @update_expressions)
150+
100151
begin
101152
# Our timestamp object now has a to_bson method, using it here
102153
# {}.merge(other) so we don't taint the event hash innards
103154
document = {}.merge(event.to_hash)
104-
105155
if !@isodate
106156
timestamp = event.timestamp
107157
if timestamp
@@ -117,9 +167,19 @@ def receive(event)
117167
end
118168

119169
collection = event.sprintf(@collection)
120-
if @action == "update" or @action == "replace"
121-
document["metadata_mongodb_output_query_value"] = event.sprintf(@query_value)
170+
if action == "update" or action == "replace"
171+
document["metadata_mongodb_output_filter"] = apply_event_to_hash(event, @filter)
122172
end
173+
174+
if action == "update" and !(@update_expressions.nil? || @update_expressions.empty?)
175+
# we only expand the values cause keys are update expressions
176+
expressions_hash = {}
177+
@update_expressions.each do |k, v|
178+
expressions_hash[k] = apply_event_to_hash(event, v)
179+
end
180+
document["metadata_mongodb_output_update_expressions"] = expressions_hash
181+
end
182+
123183
if @bulk
124184
@@mutex.synchronize do
125185
if(!@documents[collection])
@@ -133,7 +193,8 @@ def receive(event)
133193
end
134194
end
135195
else
136-
write_to_mongodb(collection, [document])
196+
result = write_to_mongodb(collection, [document])
197+
@logger.debug("Bulk write result", :result => result)
137198
end
138199
rescue => e
139200
if e.message =~ /^E11000/
@@ -144,7 +205,7 @@ def receive(event)
144205
# to fix the issue.
145206
@logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e)
146207
else
147-
@logger.warn("Failed to send event to MongoDB, retrying in #{@retry_delay.to_s} seconds", :event => event, :exception => e)
208+
@logger.warn("Failed to send event to MongoDB retrying in #{@retry_delay.to_s} seconds", :result=> e.result, :message => e.message)
148209
sleep(@retry_delay)
149210
retry
150211
end
@@ -153,25 +214,61 @@ def receive(event)
153214

154215
def write_to_mongodb(collection, documents)
155216
ops = get_write_ops(documents)
217+
@logger.debug("Sending", :ops => ops)
156218
@db[collection].bulk_write(ops)
157219
end
158220

159221
def get_write_ops(documents)
160222
ops = []
161223
documents.each do |doc|
162-
replaced_query_value = doc["metadata_mongodb_output_query_value"]
163-
doc.delete("metadata_mongodb_output_query_value")
164-
if @action == "insert"
224+
filter = doc["metadata_mongodb_output_filter"]
225+
doc.delete("metadata_mongodb_output_filter")
226+
227+
update_expressions = doc["metadata_mongodb_output_update_expressions"]
228+
doc.delete("metadata_mongodb_output_update_expressions")
229+
230+
# TODO: support multiple expressions as pipeline for Mongo >= 4.2
231+
update = if !update_expressions.nil?
232+
update_expressions
233+
else
234+
{'$set' => to_dotted_hash(doc)}
235+
end
236+
237+
if action == "insert"
165238
ops << {:insert_one => doc}
166-
elsif @action == "update"
167-
ops << {:update_one => {:filter => {@query_key => replaced_query_value}, :update => {'$set' => to_dotted_hash(doc)}, :upsert => @upsert}}
168-
elsif @action == "replace"
169-
ops << {:replace_one => {:filter => {@query_key => replaced_query_value}, :replacement => doc, :upsert => @upsert}}
239+
elsif action == "update"
240+
ops << {:update_one => {:filter => filter, :update => update, :upsert => @upsert}}
241+
elsif action == "replace"
242+
ops << {:replace_one => {:filter => filter, :replacement => doc, :upsert => @upsert}}
170243
end
171244
end
172245
ops
173246
end
174247

248+
def is_update_operator(string)
249+
string.start_with?("$")
250+
end
251+
252+
# Apply the event to the input hash keys and values.
253+
#
254+
# This function is recursive.
255+
#
256+
# It uses event.sprintf for keys but event.get for values because it looks
257+
# like event.sprintf always returns a string and we don't want to always
258+
# coerce.
259+
#
260+
# See https://github.com/elastic/logstash/issues/5114
261+
def apply_event_to_hash(event, hash)
262+
hash.clone.each_with_object({}) do |(k, v), ret|
263+
if v.is_a? Hash
264+
ret[event.sprintf(k)] = apply_event_to_hash(event, v)
265+
else
266+
event_value = event.get(v)
267+
ret[event.sprintf(k)] = event_value.nil? ? v : event_value
268+
end
269+
end
270+
end
271+
175272
def to_dotted_hash(hash, recursive_key = "")
176273
hash.each_with_object({}) do |(k, v), ret|
177274
key = recursive_key + k.to_s

logstash-output-mongodb.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-mongodb'
3-
s.version = '3.2.0'
3+
s.version = '3.2.1'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Writes events to MongoDB"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/mongodb_spec.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
let(:uri) { 'mongodb://localhost:27017' }
77
let(:database) { 'logstash' }
88
let(:collection) { 'logs' }
9-
let(:uuid) { SecureRandom.uuid }
10-
let(:action) { 'insert' }
9+
let(:action) { 'insert' }
1110

1211
let(:config) do
1312
{ "uri" => uri, "database" => database,
@@ -19,8 +18,10 @@
1918
subject { LogStash::Outputs::Mongodb.new(config) }
2019

2120
let(:properties) { { "message" => "This is a message!",
22-
"uuid" => uuid, "number" => BigDecimal.new("4321.1234"),
23-
"utf8" => "żółć", "int" => 42,
21+
"uuid" => "00000000-0000-0000-0000-000000000000",
22+
"number" => BigDecimal.new("4321.1234"),
23+
"utf8" => "żółć",
24+
"int" => 42,
2425
"arry" => [42, "string", 4321.1234]} }
2526
let(:event) { LogStash::Event.new(properties) }
2627

0 commit comments

Comments
 (0)