Skip to content

Commit 8abb4c7

Browse files
authored
Merge pull request #151 from josefkarasek/v0.12_detect-metadata-source
[v0.12] detect metadata source
2 parents c826816 + 83436cb commit 8abb4c7

File tree

5 files changed

+1166
-112
lines changed

5 files changed

+1166
-112
lines changed

README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,33 @@ This must used named capture groups for `container_name`, `pod_name` & `namespac
4444
* `watch` - set up a watch on pods on the API server for updates to metadata (default: `true`)
4545
* `de_dot` - replace dots in labels and annotations with configured `de_dot_separator`, required for ElasticSearch 2.x compatibility (default: `true`)
4646
* `de_dot_separator` - separator to use if `de_dot` is enabled (default: `_`)
47-
* `use_journal` - If false (default), messages are expected to be formatted and tagged as if read by the fluentd in\_tail plugin with wildcard filename. If true, messages are expected to be formatted as if read from the systemd journal. The `MESSAGE` field has the full message. The `CONTAINER_NAME` field has the encoded k8s metadata (see below). The `CONTAINER_ID_FULL` field has the full container uuid. This requires docker to use the `--log-driver=journald` log driver.
47+
* *DEPRECATED* `use_journal` - If false, messages are expected to be formatted and tagged as if read by the fluentd in\_tail plugin with wildcard filename. If true, messages are expected to be formatted as if read from the systemd journal. The `MESSAGE` field has the full message. The `CONTAINER_NAME` field has the encoded k8s metadata (see below). The `CONTAINER_ID_FULL` field has the full container uuid. This requires docker to use the `--log-driver=journald` log driver. If unset (the default), the plugin will use the `CONTAINER_NAME` and `CONTAINER_ID_FULL` fields
48+
if available, otherwise, will use the tag in the `tag_to_kubernetes_name_regexp` format.
4849
* `container_name_to_kubernetes_regexp` - The regular expression used to extract the k8s metadata encoded in the journal `CONTAINER_NAME` field (default: `'^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)(\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_(?<namespace>[^_]+)_[^_]+_[^_]+$'`
4950
* This corresponds to the definition [in the source](https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/dockertools/docker.go#L317)
5051
* `annotation_match` - Array of regular expressions matching annotation field names. Matched annotations are added to a log record.
5152
* `allow_orphans` - Modify the namespace and namespace id to the values of `orphaned_namespace_name` and `orphaned_namespace_id`
5253
when true (default: `true`)
5354
* `orphaned_namespace_name` - The namespace to associate with records where the namespace can not be determined (default: `.orphaned`)
5455
* `orphaned_namespace_id` - The namespace id to associate with records where the namespace can not be determined (default: `orphaned`)
56+
* `lookup_from_k8s_field` - If the field `kubernetes` is present, lookup the metadata from the given subfields such as `kubernetes.namespace_name`, `kubernetes.pod_name`, etc. This allows you to avoid having to pass in metadata to lookup in an explicitly formatted tag name or in an explicitly formatted `CONTAINER_NAME` value. For example, set `kubernetes.namespace_name`, `kubernetes.pod_name`, `kubernetes.container_name`, and `docker.id` in the record, and the filter will fill in the rest. (default: `true`)
5557

5658
**NOTE:** As of the release 1.1.x of this plugin, it no longer supports parsing the source message into JSON and attaching it to the
5759
payload. The following configuration options are removed:
5860

5961
* `merge_json_log`
6062
* `preserve_json_log`
6163

64+
**NOTE** As of this release, the use of `use_journal` is **DEPRECATED**. If this setting is not present, the plugin will
65+
attempt to figure out the source of the metadata fields from the following:
66+
- If `lookup_from_k8s_field true` (the default) and the following fields are present in the record:
67+
`docker.container_id`, `kubernetes.namespace_name`, `kubernetes.pod_name`, `kubernetes.container_name`,
68+
then the plugin will use those values as the source to use to lookup the metadata
69+
- If `use_journal true`, or `use_journal` is unset, and the fields `CONTAINER_NAME` and `CONTAINER_ID_FULL` are present in the record,
70+
then the plugin will parse those values using `container_name_to_kubernetes_regexp` and use those as the source to lookup the metadata
71+
- Otherwise, if the tag matches `tag_to_kubernetes_name_regexp`, the plugin will parse the tag and use those values to
72+
lookup the metdata
73+
6274
Reading from the JSON formatted log files with `in_tail` and wildcard filenames:
6375
```
6476
<source>

lib/fluent/plugin/filter_kubernetes_metadata.rb

Lines changed: 67 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class KubernetesMetadataFilter < Fluent::Filter
5555
# format:
5656
# CONTAINER_NAME=k8s_$containername.$containerhash_$podname_$namespacename_$poduuid_$rand32bitashex
5757
# CONTAINER_FULL_ID=dockeridassha256hexvalue
58-
config_param :use_journal, :bool, default: false
58+
config_param :use_journal, :bool, default: nil
5959
# Field 2 is the container_hash, field 5 is the pod_id, and field 6 is the pod_randhex
6060
# I would have included them as named groups, but you can't have named groups that are
6161
# non-capturing :P
@@ -69,6 +69,7 @@ class KubernetesMetadataFilter < Fluent::Filter
6969
config_param :allow_orphans, :bool, default: true
7070
config_param :orphaned_namespace_name, :string, default: '.orphaned'
7171
config_param :orphaned_namespace_id, :string, default: 'orphaned'
72+
config_param :lookup_from_k8s_field, :bool, default: true
7273

7374
def fetch_pod_metadata(namespace_name, pod_name)
7475
log.trace("fetching pod metadata: #{namespace_name}/#{pod_name}") if log.trace?
@@ -234,13 +235,10 @@ def log.trace?
234235
namespace_thread.abort_on_exception = true
235236
end
236237
end
237-
if @use_journal
238-
log.debug "Will stream from the journal"
239-
self.class.class_eval { alias_method :filter_stream, :filter_stream_from_journal }
240-
else
241-
log.debug "Will stream from the files"
242-
self.class.class_eval { alias_method :filter_stream, :filter_stream_from_files }
243-
end
238+
@time_fields = []
239+
@time_fields.push('_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP') if @use_journal || @use_journal.nil?
240+
@time_fields.push('time') unless @use_journal
241+
@time_fields.push('@timestamp') if @lookup_from_k8s_field
244242

245243
@annotations_regexps = []
246244
@annotation_match.each do |regexp|
@@ -253,102 +251,95 @@ def log.trace?
253251

254252
end
255253

256-
def get_metadata_for_record(match_data, container_id, create_time, batch_miss_cache)
257-
namespace_name = match_data['namespace']
258-
pod_name = match_data['pod_name']
259-
container_name = match_data['container_name']
254+
def get_metadata_for_record(namespace_name, pod_name, container_name, container_id, create_time, batch_miss_cache)
260255
metadata = {
261-
'container_name' => container_name,
262-
'namespace_name' => namespace_name,
263-
'pod_name' => pod_name
256+
'docker' => {'container_id' => container_id},
257+
'kubernetes' => {
258+
'container_name' => container_name,
259+
'namespace_name' => namespace_name,
260+
'pod_name' => pod_name
261+
}
264262
}
265263
if @kubernetes_url.present?
266264
pod_metadata = get_pod_metadata(container_id, namespace_name, pod_name, create_time, batch_miss_cache)
267265

268266
if (pod_metadata.include? 'containers') && (pod_metadata['containers'].include? container_id)
269-
metadata['container_image'] = pod_metadata['containers'][container_id]['image']
270-
metadata['container_image_id'] = pod_metadata['containers'][container_id]['image_id']
267+
metadata['kubernetes']['container_image'] = pod_metadata['containers'][container_id]['image']
268+
metadata['kubernetes']['container_image_id'] = pod_metadata['containers'][container_id]['image_id']
271269
end
272270

273-
metadata.merge!(pod_metadata) if pod_metadata
274-
metadata.delete('containers')
271+
metadata['kubernetes'].merge!(pod_metadata) if pod_metadata
272+
metadata['kubernetes'].delete('containers')
275273
end
276274
metadata
277275
end
278276

279-
def create_time_from_record(record)
280-
time = if @use_journal
281-
record['_SOURCE_REALTIME_TIMESTAMP'].nil? ? record['_SOURCE_REALTIME_TIMESTAMP'] : record['__REALTIME_TIMESTAMP']
282-
else
283-
record['time']
284-
end
285-
(time.nil? || time.chop.empty?) ? Time.now : Time.parse(time)
286-
end
287-
288-
def filter_stream(tag, es)
289-
es
290-
end
291-
292-
def filter_stream_from_files(tag, es)
293-
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
294-
new_es = MultiEventStream.new
295-
296-
match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled)
297-
batch_miss_cache = {}
298-
if match_data
299-
container_id = match_data['docker_id']
300-
metadata = {
301-
'docker' => {
302-
'container_id' => container_id
303-
},
304-
'kubernetes' => get_metadata_for_record(match_data, container_id, create_time_from_record(es.first[1]), batch_miss_cache)
305-
}
277+
def create_time_from_record(record, internal_time)
278+
time_key = @time_fields.detect{ |ii| record.has_key?(ii) }
279+
time = record[time_key]
280+
if time.nil? || time.chop.empty?
281+
return internal_time
306282
end
307-
308-
es.each do |time, record|
309-
record = record.merge(Marshal.load(Marshal.dump(metadata))) if metadata
310-
new_es.add(time, record)
283+
if ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP'].include?(time_key)
284+
timei= time.to_i
285+
return Time.at(timei / 1000000, timei % 1000000)
311286
end
312-
dump_stats
313-
new_es
287+
return Time.parse(time)
314288
end
315289

316-
def filter_stream_from_journal(tag, es)
290+
def filter_stream(tag, es)
317291
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
318-
new_es = MultiEventStream.new
292+
new_es = Fluent::MultiEventStream.new
293+
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
294+
tag_metadata = nil
319295
batch_miss_cache = {}
320296
es.each do |time, record|
321-
metadata = nil
322-
if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL')
323-
metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data|
324-
container_id = record['CONTAINER_ID_FULL']
325-
metadata = {
326-
'docker' => {
327-
'container_id' => container_id
328-
},
329-
'kubernetes' => get_metadata_for_record(match_data, container_id, create_time_from_record(record), batch_miss_cache)
330-
}
331-
332-
metadata
333-
end
334-
unless metadata
335-
log.debug "Error: could not match CONTAINER_NAME from record #{record}"
336-
@stats.bump(:container_name_match_failed)
337-
end
338-
elsif record.has_key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_')
339-
log.debug "Error: no container name and id in record #{record}"
340-
@stats.bump(:container_name_id_missing)
297+
if tag_match_data && tag_metadata.nil?
298+
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
299+
tag_match_data['docker_id'], create_time_from_record(record, time), batch_miss_cache)
300+
end
301+
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
302+
if (@use_journal || @use_journal.nil?) &&
303+
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
304+
metadata = j_metadata
305+
end
306+
if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') &&
307+
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
308+
record['kubernetes'].has_key?('namespace_name') &&
309+
record['kubernetes'].has_key?('pod_name') &&
310+
record['kubernetes'].has_key?('container_name') &&
311+
record['docker'].has_key?('container_id') &&
312+
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
313+
record['kubernetes']['container_name'], record['docker']['container_id'],
314+
create_time_from_record(record, time), batch_miss_cache))
315+
metadata = k_metadata
341316
end
342317

343318
record = record.merge(metadata) if metadata
344-
345319
new_es.add(time, record)
346320
end
347-
348321
dump_stats
349322
new_es
350323
end
351324

325+
def get_metadata_for_journal_record(record, time, batch_miss_cache)
326+
metadata = nil
327+
if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL')
328+
metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data|
329+
get_metadata_for_record(match_data['namespace'], match_data['pod_name'], match_data['container_name'],
330+
record['CONTAINER_ID_FULL'], create_time_from_record(record, time), batch_miss_cache)
331+
end
332+
unless metadata
333+
log.debug "Error: could not match CONTAINER_NAME from record #{record}"
334+
@stats.bump(:container_name_match_failed)
335+
end
336+
elsif record.has_key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_')
337+
log.debug "Error: no container name and id in record #{record}"
338+
@stats.bump(:container_name_id_missing)
339+
end
340+
metadata
341+
end
342+
352343
def de_dot!(h)
353344
h.keys.each do |ref|
354345
if h[ref] && ref =~ /\./

0 commit comments

Comments
 (0)