Skip to content

Restructure influxdb3 /plugins for step-by-step clarity #5963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 63 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
6217825
docs: restructuring introduction and adding a begin section
MeelahMe Apr 4, 2025
524bcf9
docs: updating setp up section and adding more direction
MeelahMe Apr 4, 2025
c030f68
docs: revising introduction and restructuring the first part of addin…
MeelahMe Apr 4, 2025
2396ba1
docs: updating Add a Processing engine plugin section
MeelahMe Apr 7, 2025
0f853ec
docs: revising Create a custom plugin section
MeelahMe Apr 7, 2025
dd098c3
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 7, 2025
c652fcd
docs: updating before you begin section and step three
MeelahMe Apr 7, 2025
c441831
Merge branch '5951-restructure-influxdb3-plugins' of github.com:influ…
MeelahMe Apr 7, 2025
3572d79
WIP: porting over extened plugins with api feature
MeelahMe Apr 7, 2025
797c74f
docs: updating gh triggers
MeelahMe Apr 9, 2025
0a66059
docs: updating configure error handling and python dependencies
MeelahMe Apr 9, 2025
d7c63d5
docs: first read through for mistakes
MeelahMe Apr 9, 2025
2e2da73
WIP: Setting up file and path structure and confirming functionality
MeelahMe Apr 9, 2025
1be8918
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 11, 2025
7350053
adding extebded-plugin.md to core
MeelahMe Apr 14, 2025
098942e
updates to extended-plugin.md in core
MeelahMe Apr 14, 2025
6ec728a
updating enterprise extended-plugin.md
MeelahMe Apr 14, 2025
6394812
Checking file paths and dropdowns
MeelahMe Apr 14, 2025
e568a12
docs: updating introduction sections for extended plugins
MeelahMe Apr 14, 2025
ce3974c
updating and adding a get started section
MeelahMe Apr 14, 2025
8bea526
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 14, 2025
149b175
docs: finishing up resturcting and did first proofread
MeelahMe Apr 15, 2025
706074f
Merge branch '5951-restructure-influxdb3-plugins' of github.com:influ…
MeelahMe Apr 15, 2025
33e9e9d
Update content/shared/v3-core-plugins/extended-plugin-api.md
MeelahMe Apr 15, 2025
10c82da
Update content/influxdb3/core/extended-plugin.md
MeelahMe Apr 15, 2025
e90656e
Update content/influxdb3/core/extended-plugin.md
MeelahMe Apr 15, 2025
2e11cba
Update content/influxdb3/enterprise/extended-plugin.md
MeelahMe Apr 15, 2025
c1a1ba8
Update content/influxdb3/enterprise/extended-plugin.md
MeelahMe Apr 15, 2025
22c9617
Update content/shared/influxdb3-processing-engine.md
MeelahMe Apr 15, 2025
8f543a6
Update content/shared/influxdb3-processing-engine.md
MeelahMe Apr 15, 2025
851fa2c
Update content/shared/influxdb3-processing-engine.md
MeelahMe Apr 15, 2025
811cf2d
Update content/shared/influxdb3-processing-engine.md
MeelahMe Apr 15, 2025
f65b337
Update content/shared/influxdb3-processing-engine.md
MeelahMe Apr 15, 2025
e4eb00c
Update content/shared/v3-core-plugins/extended-plugin-api.md
MeelahMe Apr 15, 2025
464a816
Update content/shared/v3-core-plugins/extended-plugin-api.md
MeelahMe Apr 15, 2025
c099310
Update content/shared/influxdb3-processing-engine.md
MeelahMe Apr 16, 2025
78d8a43
updating cache integration
MeelahMe Apr 16, 2025
43922af
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 16, 2025
29d9186
update to cache integrations
MeelahMe Apr 16, 2025
f3f6198
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 18, 2025
8ab968a
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 21, 2025
6c565a9
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 23, 2025
e666d86
Merge branch '5951-restructure-influxdb3-plugins' of github.com:influ…
MeelahMe Apr 25, 2025
8f4d914
Update content/shared/v3-core-plugins/extended-plugin-api.md
MeelahMe Apr 25, 2025
a56a8d8
updating file paths
MeelahMe Apr 25, 2025
94d99d9
Merge branch '5951-restructure-influxdb3-plugins' of github.com:influ…
MeelahMe Apr 25, 2025
26084af
Update content/influxdb3/core/extended-plugin.md
MeelahMe Apr 28, 2025
39722d5
Update content/influxdb3/enterprise/extended-plugin.md
MeelahMe Apr 28, 2025
391538f
Update content/influxdb3/enterprise/extended-plugin.md
MeelahMe Apr 28, 2025
88c7cb0
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
63aeab7
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
377604e
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
a775edd
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
d3af0d4
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
4825b91
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
740dd2b
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
2f77fca
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
571c3e5
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
9ac78d9
updating file names
MeelahMe Apr 28, 2025
0ad66f5
creating a linked TOC
MeelahMe Apr 28, 2025
1355445
Sharing a link to system table information
MeelahMe Apr 28, 2025
dac34a4
Merge branch 'master' into 5951-restructure-influxdb3-plugins
MeelahMe Apr 28, 2025
f742d52
Update content/shared/extended-plugin-api.md
MeelahMe Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions content/influxdb3/core/extend-plugin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
title: Extend plugins with API features and state management
description: |
The Processing engine includes an API that allows your plugins to interact with your data, build and write line protocol, and maintain state between executions.
menu:
influxdb3_core:
name: Extended plugins
parent: Processing engine and Python plugins
weight: 4
influxdb3/core/tags: [processing engine, plugins, API, python]
source: /shared/extended-plugin-api.md
---

<!--
// SOURCE content/shared/extended-plugin-api.md
-->

16 changes: 16 additions & 0 deletions content/influxdb3/enterprise/extend-plugin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
title: Extend plugins with API features and state management
description: |
The Processing engine includes an API that allows your plugins to interact with your data, build and write line protocol, and maintain state between executions.
menu:
influxdb3_enterprise:
name: Extend plugins
parent: Processing engine and Python plugins
weight: 4
influxdb3/enterprise/tags: [processing engine, plugins, API, python]
source: /shared/extended-plugin-api.md
---

<!--
// SOURCE content/shared/extended-plugin-api.md
-->
2 changes: 1 addition & 1 deletion content/influxdb3/enterprise/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: |
code on different events in an {{< product-name >}} instance.
menu:
influxdb3_enterprise:
name: Processing Engine and Python plugins
name: Processing engine and Python plugins
weight: 4
influxdb3/enterprise/tags: [processing engine, python]
related:
Expand Down
313 changes: 313 additions & 0 deletions content/shared/extended-plugin-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
The Processing engine includes an API that allows your plugins to
interact with your data, build and write data in line protocol format, and maintain state between executions.
These features let you build plugins that can transform, analyze, and respond to data.

The plugin API lets you:

- [Write and query data](#write-and-query-data)
- [Log messages for monitoring and debugging](#log-messages-for-monitoring-and-debugging)
- [Maintain state with in-memory cache](#maintain-state-with-in-memory-cache)
- [Guidelines for in-memory caching](#guidelines-for-in-memory-caching)

### Get started with the shared API

Every plugin has access to the shared API through the `influxdb3_local` object. You don't need to import any libraries to use the API. It's available as soon as your plugin runs.

### Write and query data

#### Write data

To write data into your database use the `LineBuilder` API to create line protocol data:

```python
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)

# Write the data to the database
influxdb3_local.write(line)
```

Your writes are buffered while the plugin runs and are flushed when the plugin completes.

{{% expand-wrapper %}}
{{% expand "View the `LineBuilder` Python implementation" %}}

```python
from typing import Optional
from collections import OrderedDict

class InfluxDBError(Exception):
"""Base exception for InfluxDB-related errors"""
pass

class InvalidMeasurementError(InfluxDBError):
"""Raised when measurement name is invalid"""
pass

class InvalidKeyError(InfluxDBError):
"""Raised when a tag or field key is invalid"""
pass

class InvalidLineError(InfluxDBError):
"""Raised when a line protocol string is invalid"""
pass

class LineBuilder:
def __init__(self, measurement: str):
if ' ' in measurement:
raise InvalidMeasurementError("Measurement name cannot contain spaces")
self.measurement = measurement
self.tags: OrderedDict[str, str] = OrderedDict()
self.fields: OrderedDict[str, str] = OrderedDict()
self._timestamp_ns: Optional[int] = None

def _validate_key(self, key: str, key_type: str) -> None:
"""Validate that a key does not contain spaces, commas, or equals signs."""
if not key:
raise InvalidKeyError(f"{key_type} key cannot be empty")
if ' ' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
if ',' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
if '=' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")

def tag(self, key: str, value: str) -> 'LineBuilder':
"""Add a tag to the line protocol."""
self._validate_key(key, "tag")
self.tags[key] = str(value)
return self

def uint64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an unsigned integer field to the line protocol."""
self._validate_key(key, "field")
if value < 0:
raise ValueError(f"uint64 field '{key}' cannot be negative")
self.fields[key] = f"{value}u"
return self

def int64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an integer field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = f"{value}i"
return self

def float64_field(self, key: str, value: float) -> 'LineBuilder':
"""Add a float field to the line protocol."""
self._validate_key(key, "field")
# Check if value has no decimal component
self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
return self

def string_field(self, key: str, value: str) -> 'LineBuilder':
"""Add a string field to the line protocol."""
self._validate_key(key, "field")
# Escape quotes and backslashes in string values
escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
self.fields[key] = f'"{escaped_value}"'
return self

def bool_field(self, key: str, value: bool) -> 'LineBuilder':
"""Add a boolean field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = 't' if value else 'f'
return self

def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
"""Set the timestamp in nanoseconds."""
self._timestamp_ns = timestamp_ns
return self

def build(self) -> str:
"""Build the line protocol string."""
# Start with measurement name (escape commas only)
line = self.measurement.replace(',', '\\,')

# Add tags if present
if self.tags:
tags_str = ','.join(
f"{k}={v}" for k, v in self.tags.items()
)
line += f",{tags_str}"

# Add fields (required)
if not self.fields:
raise InvalidLineError(f"At least one field is required: {line}")

fields_str = ','.join(
f"{k}={v}" for k, v in self.fields.items()
)
line += f" {fields_str}"

# Add timestamp if present
if self._timestamp_ns is not None:
line += f" {self._timestamp_ns}"

return line
```
{{% /expand %}}
{{% /expand-wrapper %}}

#### Query data

Your plugins can execute SQL queries and process results directly:

```python
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")

# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
```

Query results are a `List` of `Dict[String, Any]`, where each dictionary represents a row with column names as keys and column values as values.

### Log messages for monitoring and debugging

The shared API `info`, `warn`, and `error` functions accept multiple arguments,
convert them to strings, and log them as a space-separated message to the database log.

Add logging to track plugin execution:

```python
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")

# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
```
All log messages are written to the server logs and stored in [system tables](/influxdb3/core/reference/cli/influxdb3/show/system/summary/), where you can query them using SQL.
### Maintain state with in-memory cache

The Processing engine provides an in-memory cache system that enables plugins to persist and retrieve data between executions.

You can access the cache through the `cache` property of the shared API:

```python
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)
```

`cache` provides the following methods to retrieve and manage cached values:

| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `put` | `key` (str): The key to store the value under<br>`value` (Any): Any Python object to cache<br>`ttl` (Optional[float], default=None): Time in seconds before expiration<br>`use_global` (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live |
| `get` | `key` (str): The key to retrieve<br>`default` (Any, default=None): Value to return if key not found<br>`use_global` (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found |
| `delete` | `key` (str): The key to delete<br>`use_global` (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found |

##### Understanding cache namespaces

The cache system offers two distinct namespaces:

| Namespace | Scope | Best For |
| --- | --- | --- |
| **Trigger-specific** (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
| **Global** | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |

### Common cache operations
Here are some examples of how to use the cache in your plugins

##### Store and retrieve cached data

```python
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())

# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)

# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
```

##### Store cached data with expiration

```python
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
```

##### Share data across plugins

```python
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)

# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
```
#### Building a counter

You can track how many times a plugin has run:

```python
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)

# Increment counter
counter += 1

# Store the updated value
influxdb3_local.cache.put("execution_count", counter)

influxdb3_local.info(f"This plugin has run {counter} times")
```

### Guidelines for in-memory caching

To get the most out of the in-memory cache, follow these guidelines:

- [Use the trigger-specific namespace](#use-the-trigger-specific-namespace)
- [Use TTL appropriately](#use-ttl-appropriately)
- [Cache computation results](#cache-computation-results)
- [Warm the cache](#warm-the-cache)
- [Consider cache limitations](#consider-cache-limitations)

##### Use the trigger-specific namespace

The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary.

##### Use TTL appropriately

Set realistic expiration times based on how frequently data changes:

```python
# Cache external API responses for 5 minutes
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
```

##### Cache computation results

Store the results of expensive calculations that need to be utilized frequently:

```python
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
```

##### Warm the cache

For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data:

```python
# Check if cache needs to be initialized
if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())
```

##### Consider cache limitations

- **Memory Usage**: Since cache contents are stored in memory, monitor your memory usage when caching large datasets.
- **Server Restarts**: Because the cache is cleared when the server restarts, design your plugins to handle cache initialization (as noted above).
- **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.

### Next Steps

With an understanding of the InfluxDB 3 Shared Plugin API, you're ready to build data processing workflows that can transform, analyze, and respond to your time series data or extend example plugins from the [plugin repo]() on GitHub.
Loading