From 7b2bd8a05864ac8b7a813897766ba407a02a9f8c Mon Sep 17 00:00:00 2001 From: Nitin Goel Date: Thu, 10 Sep 2015 09:17:52 +0530 Subject: [PATCH 1/3] Support for dynamic field values in prefix --- lib/logstash/outputs/s3.rb | 172 +++++++++++++++++++++++++------------ 1 file changed, 118 insertions(+), 54 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 8538d4b4..d0fd4a6c 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -8,6 +8,7 @@ require "thread" require "tmpdir" require "fileutils" +require 'pathname' # INFORMATION: @@ -59,6 +60,7 @@ # size_file => 2048 (optional) # time_file => 5 (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) +# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base @@ -109,15 +111,8 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Specify how many workers to use to upload the files to S3 config :upload_workers_count, :validate => :number, :default => 1 - # Define tags to be appended to the file on the S3 bucket. - # - # Example: - # tags => ["elasticsearch", "logstash", "kibana"] - # - # Will generate this file: - # "ls.s3.logstash.local.2015-01-01T00.00.tag_elasticsearch.logstash.kibana.part0.txt" - # - config :tags, :validate => :array, :default => [] + # Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it + config :no_event_wait, :validate => :number, :default => 5 # Exposed attributes for testing purpose. attr_accessor :tempfile @@ -148,8 +143,13 @@ def aws_service_endpoint(region) def write_on_bucket(file) # find and use the bucket bucket = @s3.buckets[@bucket] + + first = Pathname.new @temporary_directory + second = Pathname.new file - remote_filename = "#{@prefix}#{File.basename(file)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) @@ -169,17 +169,21 @@ def write_on_bucket(file) # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. public - def create_temporary_file - filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - - @logger.debug("S3: Creating a new temporary file", :filename => filename) - - @file_rotation_lock.synchronize do - unless @tempfile.nil? - @tempfile.close + def create_temporary_file(prefix) + filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix])) + @file_rotation_lock[prefix].synchronize do + unless @tempfile[prefix].nil? + @tempfile[prefix].close + end + + if @prefixes.include? prefix + dirname = File.dirname(filename) + unless File.directory?(dirname) + FileUtils.mkdir_p(dirname) + end + @logger.debug("S3: Creating a new temporary file", :filename => filename) + @tempfile[prefix] = File.open(filename, "a") end - - @tempfile = File.open(filename, "a") end end @@ -194,7 +198,11 @@ def register @s3 = aws_s3_config @upload_queue = Queue.new - @file_rotation_lock = Mutex.new + @file_rotation_lock = Hash.new + @tempfile = Hash.new + @page_counter = Hash.new + @prefixes = Set.new + @empty_uploads = Hash.new if @prefix && @prefix =~ S3_INVALID_CHARACTERS @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) @@ -206,15 +214,14 @@ def register end test_s3_write - restore_from_crashes if @restore == true - reset_page_counter - create_temporary_file + #reset_page_counter + #create_temporary_file configure_periodic_rotation if time_file != 0 configure_upload_workers @codec.on_event do |event, encoded_event| - handle_event(encoded_event) + handle_event(encoded_event, event) end end @@ -251,13 +258,36 @@ def restore_from_crashes end end + public + def shouldcleanup(prefix) + return @empty_uploads[prefix] > @no_event_wait + end + public def move_file_to_bucket(file) + + @logger.debug("S3: moving to bucket ", :file => file) + + basepath = Pathname.new @temporary_directory + dirname = Pathname.new File.dirname(file) + prefixpath = dirname.relative_path_from basepath + prefix = prefixpath.to_s + @logger.debug("S3: moving the file for prefix", :prefix => prefix) + if !File.zero?(file) + if @prefixes.include? prefix + @empty_uploads[prefix] = 0 + end write_on_bucket(file) @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + else + if @prefixes.include? prefix + @empty_uploads[prefix] += 1 + end end + @logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix]) + begin File.delete(file) rescue Errno::ENOENT @@ -266,6 +296,10 @@ def move_file_to_bucket(file) rescue Errno::EACCES @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) end + + if shouldcleanup(prefix) + cleanprefix(prefix) + end end public @@ -292,9 +326,10 @@ def receive(event) end public - def rotate_events_log? - @file_rotation_lock.synchronize do - @tempfile.size > @size_file + + def rotate_events_log(prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].size > @size_file end end @@ -304,12 +339,13 @@ def write_events_to_multiple_files? end public - def write_to_tempfile(event) + def write_to_tempfile(event, prefix) + begin - @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile)) + @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix])) - @file_rotation_lock.synchronize do - @tempfile.syswrite(event) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].syswrite(event) end rescue Errno::ENOSPC @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) @@ -318,12 +354,14 @@ def write_to_tempfile(event) end public - def close + def close() shutdown_upload_workers @periodic_rotation_thread.stop! if @periodic_rotation_thread - - @file_rotation_lock.synchronize do - @tempfile.close unless @tempfile.nil? && @tempfile.closed? + + for prefix in @prefixes + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed? + end end end @@ -334,20 +372,29 @@ def shutdown_upload_workers end private - def handle_event(encoded_event) + def handle_event(encoded_event, event) + actualprefix = event.sprintf(@prefix) + if not @prefixes.to_a().include? actualprefix + @file_rotation_lock[actualprefix] = Mutex.new + @prefixes.add(actualprefix) + reset_page_counter(actualprefix) + create_temporary_file(actualprefix) + @empty_uploads[actualprefix] = 0 + end + if write_events_to_multiple_files? - if rotate_events_log? - @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile)) + if rotate_events_log(actualprefix) + @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix])) - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + move_file_to_bucket_async(@tempfile[actualprefix].path) + next_page(actualprefix) + create_temporary_file(actualprefix) else - @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file) + @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file) end end - write_to_tempfile(encoded_event) + write_to_tempfile(encoded_event, actualprefix) end private @@ -355,16 +402,33 @@ def configure_periodic_rotation @periodic_rotation_thread = Stud::Task.new do LogStash::Util::set_thread_name(" true) do - @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + Stud.interval(periodic_interval, :sleep_then_run => true) do + + @tempfile.keys.each do |key| + @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path) + move_file_to_bucket_async(@tempfile[key].path) + next_page(key) + create_temporary_file(key) + end + end end end + private + def cleanprefix(prefix) + path = File.join(@temporary_directory, prefix) + @logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close + Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'} + FileUtils.remove_dir(path) + @prefixes.delete(prefix) + @tempfile.delete(prefix) + @empty_uploads[prefix] = 0 + end + end + private def configure_upload_workers @logger.debug("S3: Configure upload workers") @@ -397,13 +461,13 @@ def upload_worker end private - def next_page - @page_counter += 1 + def next_page(key) + @page_counter[key] += 1 end private - def reset_page_counter - @page_counter = 0 + def reset_page_counter(key) + @page_counter[key] = 0 end private From 6519235f0982999cbaead40dd71a6dc7fb68bd68 Mon Sep 17 00:00:00 2001 From: Nitin Goel Date: Thu, 10 Sep 2015 09:28:34 +0530 Subject: [PATCH 2/3] Fixing the issue with deletion of temp file along with the dynamic prefix changes --- lib/logstash/outputs/s3.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index d0fd4a6c..f5b0ff73 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -473,8 +473,13 @@ def reset_page_counter(key) private def delete_on_bucket(filename) bucket = @s3.buckets[@bucket] + + first = Pathname.new @temporary_directory + second = Pathname.new filename - remote_filename = "#{@prefix}#{File.basename(filename)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket) From 6fe9ff79d8d12ed5fcab59d48e82655009b840a2 Mon Sep 17 00:00:00 2001 From: Nitin Goel Date: Thu, 26 Nov 2015 09:31:46 +0530 Subject: [PATCH 3/3] Incorporating the review comments for the dynamic prefix based s3 outputs --- lib/logstash/outputs/s3.rb | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index f5b0ff73..36426914 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -8,7 +8,7 @@ require "thread" require "tmpdir" require "fileutils" -require 'pathname' +require "pathname" # INFORMATION: @@ -60,7 +60,7 @@ # size_file => 2048 (optional) # time_file => 5 (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) -# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that) +# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base @@ -144,8 +144,8 @@ def write_on_bucket(file) # find and use the bucket bucket = @s3.buckets[@bucket] - first = Pathname.new @temporary_directory - second = Pathname.new file + first = Pathname.new(@temporary_directory) + second = Pathname.new(file) remote_filename_path = second.relative_path_from first @@ -176,11 +176,9 @@ def create_temporary_file(prefix) @tempfile[prefix].close end - if @prefixes.include? prefix + if @prefixes.include?(prefix) dirname = File.dirname(filename) - unless File.directory?(dirname) - FileUtils.mkdir_p(dirname) - end + FileUtils.mkdir_p(dirname) unless File.directory?(dirname) @logger.debug("S3: Creating a new temporary file", :filename => filename) @tempfile[prefix] = File.open(filename, "a") end @@ -215,8 +213,6 @@ def register test_s3_write restore_from_crashes if @restore == true - #reset_page_counter - #create_temporary_file configure_periodic_rotation if time_file != 0 configure_upload_workers @@ -259,7 +255,7 @@ def restore_from_crashes end public - def shouldcleanup(prefix) + def need_cleanup?(prefix) return @empty_uploads[prefix] > @no_event_wait end @@ -270,8 +266,7 @@ def move_file_to_bucket(file) basepath = Pathname.new @temporary_directory dirname = Pathname.new File.dirname(file) - prefixpath = dirname.relative_path_from basepath - prefix = prefixpath.to_s + prefix = dirname.relative_path_from(basepath).to_s @logger.debug("S3: moving the file for prefix", :prefix => prefix) if !File.zero?(file) @@ -297,9 +292,8 @@ def move_file_to_bucket(file) @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) end - if shouldcleanup(prefix) - cleanprefix(prefix) - end + clean_prefix(prefix) if need_cleanup?(prefix) + end public @@ -358,7 +352,7 @@ def close() shutdown_upload_workers @periodic_rotation_thread.stop! if @periodic_rotation_thread - for prefix in @prefixes + @prefixes.each do |prefix| @file_rotation_lock[prefix].synchronize do @tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed? end @@ -374,7 +368,7 @@ def shutdown_upload_workers private def handle_event(encoded_event, event) actualprefix = event.sprintf(@prefix) - if not @prefixes.to_a().include? actualprefix + if !@prefixes.include? actualprefix @file_rotation_lock[actualprefix] = Mutex.new @prefixes.add(actualprefix) reset_page_counter(actualprefix) @@ -416,7 +410,7 @@ def configure_periodic_rotation end private - def cleanprefix(prefix) + def clean_prefix(prefix) path = File.join(@temporary_directory, prefix) @logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix) @file_rotation_lock[prefix].synchronize do