Skip to content

Commit 1cfbdbe

Browse files
authored
feat: support batching (#10)
1 parent 7da941e commit 1cfbdbe

File tree

6 files changed

+105
-278
lines changed

6 files changed

+105
-278
lines changed

README.md

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
python-logging-loki
2-
===================
1+
# python-logging-loki
32

43
[![PyPI version](https://img.shields.io/pypi/v/python-logging-loki.svg)](https://pypi.org/project/python-logging-loki/)
54
[![Python version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-blue.svg)](https://www.python.org/)
@@ -9,45 +8,44 @@ python-logging-loki
98
Python logging handler for Loki.
109
https://grafana.com/loki
1110

12-
Installation
13-
============
11+
# Installation
12+
1413
```bash
1514
pip install python-logging-loki
1615
```
1716

18-
Usage
19-
=====
17+
# Usage
2018

2119
```python
2220
import logging
2321
import logging_loki
2422

2523

2624
handler = logging_loki.LokiHandler(
27-
url="https://my-loki-instance/loki/api/v1/push",
25+
url="https://my-loki-instance/loki/api/v1/push",
2826
tags={"application": "my-app"},
2927
headers={"X-Scope-OrgID": "example-id"},
3028
auth=("username", "password"),
31-
version="1",
3229
props_to_labels: Optional[list[str]] = ["foo"]
3330
)
3431

3532
logger = logging.getLogger("my-logger")
3633
logger.addHandler(handler)
3734
logger.error(
38-
"Something happened",
35+
"Something happened",
3936
extra={"tags": {"service": "my-service"}},
4037
)
4138
```
4239

4340
Example above will send `Something happened` message along with these labels:
41+
4442
- Default labels from handler
4543
- Message level as `serverity`
46-
- Logger's name as `logger`
44+
- Logger's name as `logger`
4745
- Labels from `tags` item of `extra` dict
4846

4947
The given example is blocking (i.e. each call will wait for the message to be sent).
50-
But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread.
48+
But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread.
5149

5250
```python
5351
import logging.handlers
@@ -58,11 +56,10 @@ from multiprocessing import Queue
5856
queue = Queue(-1)
5957
handler = logging.handlers.QueueHandler(queue)
6058
handler_loki = logging_loki.LokiHandler(
61-
url="https://my-loki-instance/loki/api/v1/push",
59+
url="https://my-loki-instance/loki/api/v1/push",
6260
tags={"application": "my-app"},
6361
headers={"X-Scope-OrgID": "example-id"},
6462
auth=("username", "password"),
65-
version="1",
6663
props_to_labels: Optional[list[str]] = ["foo"]
6764
)
6865
logging.handlers.QueueListener(queue, handler_loki)
@@ -82,10 +79,9 @@ from multiprocessing import Queue
8279

8380
handler = logging_loki.LokiQueueHandler(
8481
Queue(-1),
85-
url="https://my-loki-instance/loki/api/v1/push",
82+
url="https://my-loki-instance/loki/api/v1/push",
8683
tags={"application": "my-app"},
8784
auth=("username", "password"),
88-
version="1",
8985
)
9086

9187
logger = logging.getLogger("my-logger")

logging_loki/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
from logging_loki.handlers import LokiQueueHandler
55

66
__all__ = ["LokiHandler", "LokiQueueHandler"]
7-
__version__ = "0.3.1"
7+
__version__ = "0.4.0-beta"
88
name = "logging_loki"

logging_loki/emitter.py

Lines changed: 14 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,22 @@
11
# -*- coding: utf-8 -*-
22

3-
import abc
43
import copy
54
import functools
65
import json
76
import logging
87
import threading
98
import time
109
from logging.config import ConvertingDict
11-
from typing import Any
12-
from typing import Dict
13-
from typing import List
14-
from typing import Optional
15-
from typing import Tuple
10+
from typing import Any, Dict, Optional, Tuple
1611

1712
import requests
18-
import rfc3339
1913

2014
from logging_loki import const
2115

2216
BasicAuth = Optional[Tuple[str, str]]
2317

2418

25-
class LokiEmitter(abc.ABC):
19+
class LokiEmitter:
2620
"""Base Loki emitter class."""
2721

2822
success_response_code = const.success_response_code
@@ -72,16 +66,16 @@ def __call__(self, record: logging.LogRecord, line: str):
7266
return
7367
try:
7468
payload = self.build_payload(record, line)
75-
resp = self.session.post(self.url, json=payload, headers=self.headers)
76-
if resp.status_code != self.success_response_code:
77-
raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code))
69+
self._post_to_loki(payload)
7870
finally:
7971
self._lock.release()
8072

81-
@abc.abstractmethod
82-
def build_payload(self, record: logging.LogRecord, line) -> dict:
83-
"""Build JSON payload with a log entry."""
84-
raise NotImplementedError # pragma: no cover
73+
def _post_to_loki(self, payload: dict):
74+
resp = self.session.post(self.url, json=payload, headers=self.headers)
75+
# TODO: Enqueue logs instead of raising an error and losing the logs
76+
if resp.status_code != self.success_response_code:
77+
raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code))
78+
8579

8680
@property
8781
def session(self) -> requests.Session:
@@ -127,33 +121,6 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
127121

128122
return tags
129123

130-
131-
class LokiEmitterV0(LokiEmitter):
132-
"""Emitter for Loki < 0.4.0."""
133-
134-
def build_payload(self, record: logging.LogRecord, line) -> dict:
135-
"""Build JSON payload with a log entry."""
136-
labels = self.build_labels(record)
137-
ts = rfc3339.format_microsecond(record.created)
138-
stream = {
139-
"labels": labels,
140-
"entries": [{"ts": ts, "line": line}],
141-
}
142-
return {"streams": [stream]}
143-
144-
def build_labels(self, record: logging.LogRecord) -> str:
145-
"""Return Loki labels string."""
146-
labels: List[str] = []
147-
for label_name, label_value in self.build_tags(record).items():
148-
cleared_name = self.format_label(str(label_name))
149-
cleared_value = str(label_value).replace('"', r"\"")
150-
labels.append('{0}="{1}"'.format(cleared_name, cleared_value))
151-
return "{{{0}}}".format(",".join(labels))
152-
153-
154-
class LokiEmitterV1(LokiEmitter):
155-
"""Emitter for Loki >= 0.4.0."""
156-
157124
def build_payload(self, record: logging.LogRecord, line) -> dict:
158125
"""Build JSON payload with a log entry."""
159126
labels = self.build_tags(record)
@@ -167,3 +134,8 @@ def build_payload(self, record: logging.LogRecord, line) -> dict:
167134
"values": [[ts, line]],
168135
}
169136
return {"streams": [stream]}
137+
138+
def emit_batch(self, records: list[Tuple[logging.LogRecord, str]]):
139+
"""Send log records to Loki."""
140+
streams = [self.build_payload(record[0], record[1])["streams"][0] for record in records]
141+
self._post_to_loki({"streams": streams})

logging_loki/handlers.py

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,36 @@
11
# -*- coding: utf-8 -*-
22

33
import logging
4-
import warnings
5-
from logging.handlers import QueueHandler
4+
from logging.handlers import MemoryHandler, QueueHandler
65
from logging.handlers import QueueListener
6+
import os
77
from queue import Queue
8-
from typing import Dict
9-
from typing import Optional
10-
from typing import Type
8+
import time
9+
from typing import Optional, Union
1110

12-
from logging_loki import const
13-
from logging_loki import emitter
11+
from logging_loki.emitter import BasicAuth, LokiEmitter
1412

13+
LOKI_MAX_BATCH_BUFFER_SIZE = int(os.environ.get('LOKI_MAX_BATCH_BUFFER_SIZE', 10))
1514

1615
class LokiQueueHandler(QueueHandler):
1716
"""This handler automatically creates listener and `LokiHandler` to handle logs queue."""
1817

19-
def __init__(self, queue: Queue, **kwargs):
18+
handler: Union['LokiBatchHandler', 'LokiHandler']
19+
20+
def __init__(self, queue: Queue, batch_interval: Optional[float] = None, **kwargs):
2021
"""Create new logger handler with the specified queue and kwargs for the `LokiHandler`."""
2122
super().__init__(queue)
22-
self.handler = LokiHandler(**kwargs) # noqa: WPS110
23+
24+
loki_handler = LokiHandler(**kwargs) # noqa: WPS110
25+
self.handler = LokiBatchHandler(batch_interval, target=loki_handler) if batch_interval else loki_handler
26+
2327
self.listener = QueueListener(self.queue, self.handler)
2428
self.listener.start()
2529

30+
def flush(self) -> None:
31+
super().flush()
32+
self.handler.flush()
33+
2634
def __del__(self):
2735
self.listener.stop()
2836

@@ -33,20 +41,16 @@ class LokiHandler(logging.Handler):
3341
`Loki API <https://github.com/grafana/loki/blob/master/docs/api.md>`_
3442
"""
3543

36-
emitters: Dict[str, Type[emitter.LokiEmitter]] = {
37-
"0": emitter.LokiEmitterV0,
38-
"1": emitter.LokiEmitterV1,
39-
}
44+
emitter: LokiEmitter
4045

4146
def __init__(
4247
self,
4348
url: str,
4449
tags: Optional[dict] = None,
4550
headers: Optional[dict] = None,
46-
auth: Optional[emitter.BasicAuth] = None,
47-
version: Optional[str] = None,
51+
auth: Optional[BasicAuth] = None,
4852
as_json: Optional[bool] = False,
49-
props_to_labels: Optional[list[str]] = None
53+
props_to_labels: Optional[list[str]] = None,
5054
):
5155
"""
5256
Create new Loki logging handler.
@@ -55,24 +59,13 @@ def __init__(
5559
url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`).
5660
tags: Default tags added to every log record.
5761
auth: Optional tuple with username and password for basic HTTP authentication.
58-
version: Version of Loki emitter to use.
62+
headers: Optional record with headers that are send with each POST to loki.
63+
as_json: Flag to support sending entire JSON record instead of only the message.
64+
props_to_labels: List of properties that should be converted to loki labels.
5965
6066
"""
6167
super().__init__()
62-
63-
if version is None and const.emitter_ver == "0":
64-
msg = (
65-
"Loki /api/prom/push endpoint is in the depreciation process starting from version 0.4.0.",
66-
"Explicitly set the emitter version to '0' if you want to use the old endpoint.",
67-
"Or specify '1' if you have Loki version> = 0.4.0.",
68-
"When the old API is removed from Loki, the handler will use the new version by default.",
69-
)
70-
warnings.warn(" ".join(msg), DeprecationWarning)
71-
72-
version = version or const.emitter_ver
73-
if version not in self.emitters:
74-
raise ValueError("Unknown emitter version: {0}".format(version))
75-
self.emitter = self.emitters[version](url, tags, headers, auth, as_json, props_to_labels)
68+
self.emitter = LokiEmitter(url, tags, headers, auth, as_json, props_to_labels)
7669

7770
def handleError(self, record): # noqa: N802
7871
"""Close emitter and let default handler take actions on error."""
@@ -86,3 +79,38 @@ def emit(self, record: logging.LogRecord):
8679
self.emitter(record, self.format(record))
8780
except Exception:
8881
self.handleError(record)
82+
83+
def emit_batch(self, records: list[logging.LogRecord]):
84+
"""Send a batch of log records to Loki."""
85+
# noinspection PyBroadException
86+
try:
87+
self.emitter.emit_batch([(record, self.format(record)) for record in records])
88+
except Exception:
89+
for record in records:
90+
self.handleError(record)
91+
92+
class LokiBatchHandler(MemoryHandler):
93+
interval: float # The interval at which batched logs are sent in seconds
94+
_last_flush_time: float
95+
target: LokiHandler
96+
97+
def __init__(self, interval: float, capacity: int = LOKI_MAX_BATCH_BUFFER_SIZE, **kwargs):
98+
super().__init__(capacity, **kwargs)
99+
self.interval = interval
100+
self._last_flush_time = time.time()
101+
102+
def flush(self) -> None:
103+
self.acquire()
104+
try:
105+
if self.target and self.buffer:
106+
self.target.emit_batch(self.buffer)
107+
self.buffer.clear()
108+
finally:
109+
self._last_flush_time = time.time()
110+
self.release()
111+
112+
def shouldFlush(self, record: logging.LogRecord) -> bool:
113+
return (
114+
super().shouldFlush(record) or
115+
(time.time() - self._last_flush_time >= self.interval)
116+
)

0 commit comments

Comments
 (0)