Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ in development
Fixed
~~~~~


* Fix deserialization bug in st2 API for url encoded payloads. #5536

Contributed by @sravs-dev
Expand Down Expand Up @@ -176,6 +175,14 @@ Fixed

Contributed by @minsis

* Fixed trigger references emitted by ``linux.file_watch.line``. #5467

Prior to this patch multiple files could be watched but the rule reference of last registered file
would be used for all trigger emissions causing rule enforcement to fail. References are now tracked
on a per file basis and used in trigger emissions.

Contributed by @nzlosh

3.6.0 - October 29, 2021
------------------------

Expand Down
57 changes: 32 additions & 25 deletions contrib/linux/sensors/file_watch_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.

import os

import eventlet

from logshipper.tail import Tail
Expand All @@ -27,44 +26,46 @@ def __init__(self, sensor_service, config=None):
super(FileWatchSensor, self).__init__(
sensor_service=sensor_service, config=config
)
self._trigger = None
self._logger = self._sensor_service.get_logger(__name__)
self._tail = None
self.log = self._sensor_service.get_logger(__name__)
self.tail = None
self.file_ref = {}

def setup(self):
self._tail = Tail(filenames=[])
self._tail.handler = self._handle_line
self._tail.should_run = True
self.tail = Tail(filenames=[])
self.tail.handler = self._handle_line
self.tail.should_run = True

def run(self):
self._tail.run()
self.tail.run()

def cleanup(self):
if self._tail:
self._tail.should_run = False
if self.tail:
self.tail.should_run = False

try:
self._tail.notifier.stop()
self.tail.notifier.stop()
except Exception:
self._logger.exception("Unable to stop the tail notifier")
self.log.exception("Unable to stop the tail notifier")

def add_trigger(self, trigger):
file_path = trigger["parameters"].get("file_path", None)

if not file_path:
self._logger.error('Received trigger type without "file_path" field.')
self.log.error('Received trigger type without "file_path" field.')
return

self._trigger = trigger.get("ref", None)
trigger = trigger.get("ref", None)

if not self._trigger:
raise Exception("Trigger %s did not contain a ref." % trigger)
if not trigger:
raise Exception(f"Trigger {trigger} did not contain a ref.")

# Wait a bit to avoid initialization race in logshipper library
eventlet.sleep(1.0)

self._tail.add_file(filename=file_path)
self._logger.info('Added file "%s"' % (file_path))
self.tail.add_file(filename=file_path)
self.file_ref[file_path] = trigger

self.log.info(f"Added file '{file_path}' ({trigger}) to watch list.")

def update_trigger(self, trigger):
pass
Expand All @@ -73,22 +74,28 @@ def remove_trigger(self, trigger):
file_path = trigger["parameters"].get("file_path", None)

if not file_path:
self._logger.error('Received trigger type without "file_path" field.')
self.log.error("Received trigger type without 'file_path' field.")
return

self._tail.remove_file(filename=file_path)
self._trigger = None
self.tail.remove_file(filename=file_path)
self.file_ref.pop(file_path)

self._logger.info('Removed file "%s"' % (file_path))
self.log.info(f"Removed file '{file_path}' ({trigger}) from watch list.")

def _handle_line(self, file_path, line):
trigger = self._trigger
if file_path not in self.file_ref:
self.log.error(
f"No reference found for {file_path}, unable to emit trigger!"
)
return

trigger = self.file_ref[file_path]
payload = {
"file_path": file_path,
"file_name": os.path.basename(file_path),
"line": line,
}
self._logger.debug(
"Sending payload %s for trigger %s to sensor_service.", payload, trigger
self.log.debug(
f"Sending payload {payload} for trigger {trigger} to sensor_service."
)
self.sensor_service.dispatch(trigger=trigger, payload=payload)