16
16
# See the License for the specific language governing permissions and
17
17
# limitations under the License.
18
18
#
19
+ require_relative 'kubernetes_metadata_stats'
19
20
module Fluent
20
21
class KubernetesMetadataFilter < Fluent ::Filter
21
22
K8_POD_CA_CERT = 'ca.crt'
@@ -57,6 +58,7 @@ class KubernetesMetadataFilter < Fluent::Filter
57
58
:default => '^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)(\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_(?<namespace>[^_]+)_[^_]+_[^_]+$'
58
59
59
60
config_param :annotation_match , :array , default : [ ]
61
+ config_param :stats_interval , :integer , default : 30
60
62
61
63
def syms_to_strs ( hsh )
62
64
newhsh = { }
@@ -110,15 +112,63 @@ def parse_namespace_metadata(namespace_object)
110
112
def get_pod_metadata ( namespace_name , pod_name )
111
113
begin
112
114
metadata = @client . get_pod ( pod_name , namespace_name )
113
- return if !metadata
114
- return parse_pod_metadata ( metadata )
115
- rescue KubeException
115
+ unless metadata
116
+ @stats . bump ( :pod_cache_api_nil_not_found )
117
+ else
118
+ begin
119
+ metadata = parse_pod_metadata ( metadata )
120
+ @stats . bump ( :pod_cache_api_updates )
121
+ return metadata
122
+ rescue Exception => e
123
+ log . debug ( e )
124
+ @stats . bump ( :pod_cache_api_nil_bad_resp_payload )
125
+ nil
126
+ end
127
+ end
128
+ rescue KubeException => e
129
+ @stats . bump ( :pod_cache_api_nil_error )
130
+ log . debug "Exception encountered fetching pod metadata from Kubernetes API #{ @apiVersion } endpoint #{ @kubernetes_url } : #{ e . message } "
131
+ nil
132
+ end
133
+ end
134
+
135
+ def dump_stats
136
+ @curr_time = Time . now
137
+ return if @curr_time . to_i - @prev_time . to_i < @stats_interval
138
+ @prev_time = @curr_time
139
+ @stats . set ( :pod_cache_size , @cache . count )
140
+ @stats . set ( :namespace_cache_size , @namespace_cache . count )
141
+ log . info ( @stats )
142
+ end
143
+
144
+ def get_namespace_metadata ( namespace_name )
145
+ begin
146
+ metadata = @client . get_namespace ( namespace_name )
147
+ unless metadata
148
+ @stats . bump ( :namespace_cache_api_nil_not_found )
149
+ else
150
+ begin
151
+ metadata = parse_namespace_metadata ( metadata )
152
+ @stats . bump ( :namespace_cache_api_updates )
153
+ return metadata
154
+ rescue Exception => e
155
+ log . debug ( e )
156
+ @stats . bump ( :namespace_cache_api_nil_bad_resp_payload )
157
+ nil
158
+ end
159
+ end
160
+ rescue KubeException => kube_error
161
+ @stats . bump ( :namespace_cache_api_nil_error )
162
+ log . debug "Exception encountered fetching namespace metadata from Kubernetes API #{ @apiVersion } endpoint #{ @kubernetes_url } : #{ kube_error . message } "
116
163
nil
117
164
end
118
165
end
119
166
120
167
def initialize
121
168
super
169
+ @stats = KubernetesMetadata ::Stats . new
170
+ @prev_time = Time . now
171
+
122
172
end
123
173
124
174
def configure ( conf )
@@ -235,6 +285,7 @@ def get_metadata_for_record(namespace_name, pod_name, container_name)
235
285
236
286
this = self
237
287
pod_metadata = @cache . getset ( cache_key ) {
288
+ @stats . bump ( :pod_cache_miss )
238
289
md = this . get_pod_metadata (
239
290
namespace_name ,
240
291
pod_name ,
@@ -245,14 +296,8 @@ def get_metadata_for_record(namespace_name, pod_name, container_name)
245
296
246
297
if @include_namespace_metadata
247
298
namespace_metadata = @namespace_cache . getset ( namespace_name ) {
248
- begin
249
- namespace = @client . get_namespace ( namespace_name )
250
- if namespace
251
- parse_namespace_metadata ( namespace )
252
- end
253
- rescue KubeException
254
- nil
255
- end
299
+ @stats . bump ( :namespace_cache_miss )
300
+ get_namespace_metadata ( namespace_name )
256
301
}
257
302
metadata . merge! ( namespace_metadata ) if namespace_metadata
258
303
end
@@ -289,7 +334,7 @@ def filter_stream_from_files(tag, es)
289
334
290
335
new_es . add ( time , record )
291
336
}
292
-
337
+ dump_stats
293
338
new_es
294
339
end
295
340
@@ -317,9 +362,11 @@ def filter_stream_from_journal(tag, es)
317
362
end
318
363
unless metadata
319
364
log . debug "Error: could not match CONTAINER_NAME from record #{ record } "
365
+ @stats . dump ( :container_name_match_failed )
320
366
end
321
367
elsif record . has_key? ( 'CONTAINER_NAME' ) && record [ 'CONTAINER_NAME' ] . start_with? ( 'k8s_' )
322
368
log . debug "Error: no container name and id in record #{ record } "
369
+ @stats . dump ( :container_name_id_missing )
323
370
end
324
371
325
372
if metadata
@@ -329,6 +376,7 @@ def filter_stream_from_journal(tag, es)
329
376
new_es . add ( time , record )
330
377
}
331
378
379
+ dump_stats
332
380
new_es
333
381
end
334
382
@@ -341,7 +389,9 @@ def merge_json_log(record)
341
389
unless @preserve_json_log
342
390
record . delete ( @merge_json_log_key )
343
391
end
344
- rescue JSON ::ParserError
392
+ rescue JSON ::ParserError => e
393
+ @stats . bump ( :merge_json_parse_errors )
394
+ log . debug ( e )
345
395
end
346
396
end
347
397
end
@@ -376,9 +426,7 @@ def start_watch
376
426
watcher = @client . watch_pods ( resource_version )
377
427
rescue Exception => e
378
428
message = "Exception encountered fetching metadata from Kubernetes API endpoint: #{ e . message } "
379
- if e . respond_to? ( :response )
380
- message += " (#{ e . response } )"
381
- end
429
+ message += " (#{ e . response } )" if e . respond_to? ( :response )
382
430
383
431
raise Fluent ::ConfigError , message
384
432
end
@@ -390,33 +438,50 @@ def start_watch
390
438
cached = @cache [ cache_key ]
391
439
if cached
392
440
@cache [ cache_key ] = parse_pod_metadata ( notice . object )
441
+ @stats . bump ( :pod_cache_watch_updates )
442
+ else
443
+ @stats . bump ( :pod_cache_watch_misses )
393
444
end
394
445
when 'DELETED'
395
446
cache_key = "#{ notice . object [ 'metadata' ] [ 'namespace' ] } _#{ notice . object [ 'metadata' ] [ 'name' ] } "
396
447
@cache . delete ( cache_key )
448
+ @stats . bump ( :pod_cache_watch_deletes )
397
449
else
398
450
# Don't pay attention to creations, since the created pod may not
399
451
# end up on this node.
452
+ @stats . bump ( :pod_cache_watch_ignored )
400
453
end
401
454
end
402
455
end
403
456
404
457
def start_namespace_watch
405
- resource_version = @client . get_namespaces . resourceVersion
406
- watcher = @client . watch_namespaces ( resource_version )
458
+ begin
459
+ resource_version = @client . get_namespaces . resourceVersion
460
+ watcher = @client . watch_namespaces ( resource_version )
461
+ rescue Exception => e
462
+ message = "start_namespace_watch: Exception encountered setting up namespace watch from Kubernetes API #{ @apiVersion } endpoint #{ @kubernetes_url } : #{ e . message } "
463
+ message += " (#{ e . response } )" if e . respond_to? ( :response )
464
+ log . debug ( message )
465
+ raise Fluent ::ConfigError , message
466
+ end
407
467
watcher . each do |notice |
408
468
case notice . type
409
469
when 'MODIFIED'
410
470
cache_key = notice . object [ 'metadata' ] [ 'name' ]
411
471
cached = @namespace_cache [ cache_key ]
412
472
if cached
413
473
@namespace_cache [ cache_key ] = parse_namespace_metadata ( notice . object )
474
+ @stats . bump ( :namespace_cache_watch_updates )
475
+ else
476
+ @stats . bump ( :namespace_cache_watch_misses )
414
477
end
415
478
when 'DELETED'
416
479
@namespace_cache . delete ( notice . object [ 'metadata' ] [ 'name' ] )
480
+ @stats . bump ( :namespace_cache_watch_deletes )
417
481
else
418
482
# Don't pay attention to creations, since the created namespace may not
419
483
# be used by any pod on this node.
484
+ @stats . bump ( :namespace_cache_watch_ignored )
420
485
end
421
486
end
422
487
end
0 commit comments