Skip to content

Commit

Permalink
Merge pull request #36 from emacski/feature/journal-entry-filter
Browse files Browse the repository at this point in the history
Basic Systemd Journal Entry Mutation
  • Loading branch information
errm authored Jul 25, 2017
2 parents 9afd7c7 + 7ee8715 commit a9ae098
Show file tree
Hide file tree
Showing 8 changed files with 604 additions and 73 deletions.
84 changes: 77 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# systemd input plugin for [Fluentd](http://github.com/fluent/fluentd)
# systemd plugin for [Fluentd](http://github.com/fluent/fluentd)

[![Build Status](https://travis-ci.org/reevoo/fluent-plugin-systemd.svg?branch=master)](https://travis-ci.org/reevoo/fluent-plugin-systemd) [![Code Climate GPA](https://codeclimate.com/github/reevoo/fluent-plugin-systemd/badges/gpa.svg)](https://codeclimate.com/github/reevoo/fluent-plugin-systemd) [![Gem Version](https://badge.fury.io/rb/fluent-plugin-systemd.svg)](https://rubygems.org/gems/fluent-plugin-systemd)

## Overview

**systemd** input plugin reads logs from the systemd journal
**systemd** input plugin reads logs from the systemd journal
**systemd** filter plugin allows for basic manipulation of systemd journal entries

## Support

Expand Down Expand Up @@ -33,19 +34,24 @@ or

td-agent-gem install fluent-plugin-systemd -v 0.2.0

## Configuration
## Input Plugin Configuration

<source>
@type systemd
tag kube-proxy
path /var/log/journal
filters [{ "_SYSTEMD_UNIT": "kube-proxy.service" }]
read_from_head true
<storage>
@type local
persistent true
path kube-proxy.pos
</storage>
tag kube-proxy
read_from_head true
<entry>
field_map {"MESSAGE": "log", "_PID": ["process", "pid"], "_CMDLINE": "process", "_COMM": "cmd"}
fields_strip_underscores true
fields_lowercase true
</entry>
</source>

<match kube-proxy>
Expand Down Expand Up @@ -84,19 +90,83 @@ If true reads all available journal from head, otherwise starts reading from tai

**`strip_underscores`**

_This parameter is deprecated and will be removed in favour of entry in v1.0._

If true strips underscores from the beginning of systemd field names.
May be useful if outputting to kibana, as underscore prefixed fields are unindexed there.

**`entry`**

Optional configuration for an embeded systemd entry filter. See the [Filter Plugin Configuration](#filter-plugin-configuration) for config reference.

**`tag`**

_Required_
_Required_

A tag that will be added to events generated by this input.

## Example
### Example

For an example of a full working setup including the plugin, [take a look at](https://github.com/assemblyline/fluentd)

## Filter Plugin Configuration

<filter kube-proxy>
@type systemd_entry
field_map {"MESSAGE": "log", "_PID": ["process", "pid"], "_CMDLINE": "process", "_COMM": "cmd"}
field_map_strict false
fields_lowercase true
fields_strip_underscores true
</filter>

**`field_map`**

Object / hash defining a mapping of source fields to destination fields. Destination fields may be existing or new user-defined fields. If multiple source fields are mapped to the same destination field, the contents of the fields will be appended to the destination field in the order defined in the mapping. A field map declaration takes the form of:

{
"<src_field1>": "<dst_field1>",
"<src_field2>": ["<dst_field1>", "<dst_field2>"],
...
}
Defaults to an empty map.

**`field_map_strict`**

If true, only destination fields from `field_map` are included in the result. Defaults to false.

**`fields_lowercase`**

If true, lowercase all non-mapped fields. Defaults to false.

**`fields_strip_underscores`**

If true, strip leading underscores from all non-mapped fields. Defaults to false.

### Example

Given a systemd journal source entry:
```
{
"_MACHINE_ID": "bb9d0a52a41243829ecd729b40ac0bce"
"_HOSTNAME": "arch"
"MESSAGE": "this is a log message",
"_PID": "123"
"_CMDLINE": "login -- root"
"_COMM": "login"
}
```
The resulting entry using the above sample configuration:
```
{
"machine_id": "bb9d0a52a41243829ecd729b40ac0bce"
"hostname": "arch",
"msg": "this is a log message",
"pid": "123"
"cmd": "login"
"process": "123 login -- root"
}
```

## Dependencies

This plugin depends on libsystemd
Expand Down
29 changes: 29 additions & 0 deletions lib/fluent/plugin/filter_systemd_entry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true
require "fluent/plugin/filter"
require "fluent/plugin/systemd/entry_mutator"

module Fluent
module Plugin
# Fluentd systemd/journal filter plugin
class SystemdEntryFilter < Filter
Fluent::Plugin.register_filter("systemd_entry", self)

config_param :field_map, :hash, default: {}
config_param :field_map_strict, :bool, default: false
config_param :fields_strip_underscores, :bool, default: false
config_param :fields_lowercase, :bool, default: false

def configure(conf)
super
@mutator = SystemdEntryMutator.new(**@config_root_section.to_h)
if @mutator.field_map_strict && @mutator.field_map.empty?
log.warn("`field_map_strict` set to true with empty `field_map`, expect no fields")
end
end

def filter(_tag, _time, entry)
@mutator.run(entry)
end
end
end
end
23 changes: 19 additions & 4 deletions lib/fluent/plugin/in_systemd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "systemd/journal"
require "fluent/plugin/input"
require "fluent/plugin/systemd/pos_writer"
require "fluent/plugin/systemd/entry_mutator"

module Fluent
module Plugin
Expand All @@ -16,7 +17,8 @@ class SystemdInput < Input
config_param :filters, :array, default: []
config_param :pos_file, :string, default: nil, deprecated: "Use <storage> section with `persistent: true' instead"
config_param :read_from_head, :bool, default: false
config_param :strip_underscores, :bool, default: false
config_param :strip_underscores, :bool, default: false, deprecated: "Use <entry> section or `systemd_entry` " \
"filter plugin instead"
config_param :tag, :string

config_section :storage do
Expand All @@ -25,10 +27,24 @@ class SystemdInput < Input
config_set_default :persistent, false
end

config_section :entry, param_name: "entry_opts", required: false, multi: false do
config_param :field_map, :hash, default: {}
config_param :field_map_strict, :bool, default: false
config_param :fields_strip_underscores, :bool, default: false
config_param :fields_lowercase, :bool, default: false
end

def configure(conf)
super
@pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions"))
@journal = nil
@pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions"))
# legacy strip_underscores backwards compatibility (legacy takes
# precedence and is mutually exclusive with the entry block)
mut_opts = @strip_underscores ? { fields_strip_underscores: true } : @entry_opts.to_h
@mutator = SystemdEntryMutator.new(**mut_opts)
if @mutator.field_map_strict && @mutator.field_map.empty?
log.warn("`field_map_strict` set to true with empty `field_map`, expect no fields")
end
end

def start
Expand Down Expand Up @@ -107,8 +123,7 @@ def emit(entry)
end

def formatted(entry)
return entry.to_h unless @strip_underscores
Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]
@mutator.run(entry)
end

def watch
Expand Down
151 changes: 151 additions & 0 deletions lib/fluent/plugin/systemd/entry_mutator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# frozen_string_literal: true
require "fluent/config/error"

module Fluent
module Plugin
# A simple stand-alone configurable mutator for systemd journal entries.
#
# Note regarding field mapping:
# The input `field_map` option is meant to have a structure that is
# intuative or logical for humans when declaring a field map.
# {
# "<source_field1>" => "<new_field1>",
# "<source_field2>" => ["<new_field1>", "<new_field2>"]
# }
# Internally the inverse of the human-friendly field_map is
# computed (and cached) upon object creation and used as a "mapped model"
# {
# "<new_field1>" => ["<source_field1>", "<source_field2>"],
# "<new_field2>" => ["<source_field2>"]
# }
class SystemdEntryMutator

Options = Struct.new(
:field_map,
:field_map_strict,
:fields_lowercase,
:fields_strip_underscores,
)

def self.default_opts
Options.new({}, false, false, false)
end

# Constructor keyword options (all other kwargs are ignored):
# field_map - hash describing the desired field mapping in the form:
# {"<source_field>" => "<new_field>", ...}
# where `new_field` is a string or array of strings
# field_map_strict - boolean if true will only include new fields
# defined in `field_map`
# fields_strip_underscores - boolean if true will strip all leading
# underscores from non-mapped fields
# fields_lowercase - boolean if true lowercase all non-mapped fields
#
# raises `Fluent::ConfigError` for invalid options
def initialize(**options)
@opts = options_from_hash(options)
validate_options(@opts)
@map = invert_field_map(@opts.field_map)
@map_src_fields = @opts.field_map.keys
@no_transform = @opts == self.class.default_opts
end

# Expose config state as read-only instance properties of the mutator.
def method_missing(sym, *args)
return @opts[sym] if @opts.members.include?(sym)
super
end

# The main run method that performs all configured mutations, if any,
# against a single journal entry. Returns the mutated entry hash.
# entry - hash or `Systemd::Journal:Entry`
def run(entry)
return entry.to_h if @no_transform
return map_fields(entry) if @opts.field_map_strict
format_fields(entry, map_fields(entry))
end

# Run field mapping against a single journal entry. Returns the mutated
# entry hash.
# entry - hash or `Systemd::Journal:Entry`
def map_fields(entry)
mapped = {}
@map.each do |cstm, sysds|
vals = sysds.collect { |fld| entry[fld] }.compact
next if vals.empty? # systemd field does not exist in source entry
mapped[cstm] = vals.length == 1 ? vals[0] : vals.join(" ")
end
mapped
end

# Run field formatting (mutations applied to all non-mapped fields)
# against a single journal entry. Returns the mutated entry hash.
# entry - hash or `Systemd::Journal:Entry`
# mapped - Optional hash that represents a previously mapped entry to
# which the formatted fields will be added
def format_fields(entry, mapped = nil)
mapped ||= {}
entry.each do |fld, val|
# don't mess with explicitly mapped fields
next if @map_src_fields.include?(fld)
fld = fld.gsub(/\A_+/, "") if @opts.fields_strip_underscores
fld = fld.downcase if @opts.fields_lowercase
# account for mapping (appending) to an existing systemd field
mapped[fld] = mapped.key?(fld) ? [val, mapped[fld]].join(" ") : val
end
mapped
end

private

# Returns a `SystemdEntryMutator::Options` struct derived from the
# elements in the supplied hash merged with the option defaults
def options_from_hash(opts)
merged = self.class.default_opts
merged.each_pair do |k, _|
merged[k] = opts[k] if opts.key?(k)
end
merged
end

def validate_options(opts)
unless validate_strings_or_empty(opts[:field_map].keys)
err = "`field_map` keys must be strings"
end
unless validate_strings_or_empty(opts[:field_map].values, true)
err = "`field_map` values must be strings or array of strings"
end
%i[field_map_strict fields_strip_underscores fields_lowercase].each do |opt|
err = "`#{opt}` must be boolean" unless [true, false].include?(opts[opt])
end
fail Fluent::ConfigError, err unless err.nil?
end

# Validates that values in array `arr` are strings. If `nested` is true
# also allow and validate that `arr` values can be an array of strings
def validate_strings_or_empty(arr, nested = false)
return true if arr.empty?
arr.each do |v|
return true if v.is_a?(String)
if v.is_a?(Array) && nested
v.each { |nstd| return false unless nstd.is_a?(String) }
end
end
false
end

# Compute the inverse of a human friendly field map `fm` which is what
# the mutator uses for the actual mapping. The resulting structure for
# the inverse field map hash is:
# {"<new_field_name>" => ["<source_field_name>", ...], ...}
def invert_field_map(fm)
invs = {}
fm.values.flatten.uniq.each do |cstm|
sysds = fm.select { |_, v| (v == cstm || v.include?(cstm)) }
invs[cstm] = sysds.keys
end
invs
end
end
end
end
1 change: 0 additions & 1 deletion test/helper.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# frozen_string_literal: true
require "test/unit"
require "fluent/test"
require "fluent/test/driver/input"
require "fluent/test/helpers"
Loading

0 comments on commit a9ae098

Please sign in to comment.