diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 623a0be6d..bb2f1fa97 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -203,6 +203,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # to see if they have come back to life config :resurrect_delay, :validate => :number, :default => 5 + # Control whether to remove empty valued params from action, specifically parent, _id, _routing + # In event_action_params() clean params from nil and empty values - when using dynamic fields + # (like document_id => "%{[@metadata][document_id]}"), empty or nil values could be produced + # for document_id and parent fields (in es_bulk metadata part), which is not valid es_bulk + # metadata format, thus those values should be cleaned. Default: false + config :remove_empty_action_params, :validate => :boolean, :default => false + # How long to wait before checking if the connection is stale before executing a request on a connection using keepalive. # You may want to set this lower, if you get connection errors regularly # Quoting the Apache commons docs (this client is based Apache Commmons): diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index d442b2fe2..c1384ee77 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -177,6 +177,10 @@ def event_action_params(event) params[:version_type] = event.sprintf(@version_type) end + if @remove_empty_action_params + params = params.delete_if { |k,v| v.nil? or (v.is_a? String and v.empty?) } + end + params end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 414468e9a..c34812d2c 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -321,6 +321,53 @@ end end + describe "remove empty params from action params" do + let(:options) { {"parent" => "%{myparent}", "document_id" => "%{myid}"} } + subject(:eso) {LogStash::Outputs::ElasticSearch.new(options)} + + let(:event_full) { LogStash::Event.new("message" => "blah", "myid" => "mytestid", "myparent" => "mytestparent") } + let(:event_partial_missing) { LogStash::Event.new("message" => "blah") } + let(:event_partial_empty) { LogStash::Event.new("message" => "blah", "myid" => "", "myparent" => "") } + + context "when remove_empty_action_params enabled" do + let(:options) { super.merge("remove_empty_action_params" => true) } + + it "should interpolate the requested id and parent values" do + params = eso.event_action_params(event_full) + expect(params).to include(:_id => "mytestid", :parent => "mytestparent") + end + + it "should interpolate the requested id and parent values and leave them as is (sprintf behavior, not sure if correct one) when they missing" do + params = eso.event_action_params(event_partial_missing) + expect(params).to include(:_id => "%{myid}", :parent => "%{myparent}") + end + + it "should not interpolate the requested id and parent values and remove them when they are empty" do + params = eso.event_action_params(event_partial_empty) + expect(params).not_to include(:_id, :parent) + end + end + + context "when remove_empty_action_params disabled" do + let(:options) { super.merge("remove_empty_action_params" => false) } + + it "should interpolate the requested id and parent values" do + params = eso.event_action_params(event_full) + expect(params).to include({:_id => "mytestid", :parent => "mytestparent"}) + end + + it "should interpolate the requested id and parent values and leave them as is (sprintf behavior, not sure if correct one) when they missing" do + params = eso.event_action_params(event_partial_missing) + expect(params).to include(:_id => "%{myid}", :parent => "%{myparent}") + end + + it "should interpolate the requested id and parent values and leave them as is (sprintf behavior, not sure if correct one) when they empty" do + params = eso.event_action_params(event_partial_missing) + expect(params).to include(:_id => "%{myid}", :parent => "%{myparent}") + end + end + end + describe "sleep interval calculation" do let(:retry_max_interval) { 64 } subject(:eso) { LogStash::Outputs::ElasticSearch.new("retry_max_interval" => retry_max_interval) }