diff --git a/content/influxdb3/core/extend-plugin.md b/content/influxdb3/core/extend-plugin.md new file mode 100644 index 0000000000..8f8516f9a8 --- /dev/null +++ b/content/influxdb3/core/extend-plugin.md @@ -0,0 +1,17 @@ +--- +title: Extend plugins with API features and state management +description: | + The Processing engine includes an API that allows your plugins to interact with your data, build and write line protocol, and maintain state between executions. +menu: + influxdb3_core: + name: Extended plugins + parent: Processing engine and Python plugins +weight: 4 +influxdb3/core/tags: [processing engine, plugins, API, python] +source: /shared/extended-plugin-api.md +--- + + + diff --git a/content/influxdb3/enterprise/extend-plugin.md b/content/influxdb3/enterprise/extend-plugin.md new file mode 100644 index 0000000000..c4752fa97f --- /dev/null +++ b/content/influxdb3/enterprise/extend-plugin.md @@ -0,0 +1,16 @@ +--- +title: Extend plugins with API features and state management +description: | + The Processing engine includes an API that allows your plugins to interact with your data, build and write line protocol, and maintain state between executions. +menu: + influxdb3_enterprise: + name: Extend plugins + parent: Processing engine and Python plugins +weight: 4 +influxdb3/enterprise/tags: [processing engine, plugins, API, python] +source: /shared/extended-plugin-api.md +--- + + \ No newline at end of file diff --git a/content/influxdb3/enterprise/plugins.md b/content/influxdb3/enterprise/plugins.md index 73ad8c3c3f..6862a163fe 100644 --- a/content/influxdb3/enterprise/plugins.md +++ b/content/influxdb3/enterprise/plugins.md @@ -5,7 +5,7 @@ description: | code on different events in an {{< product-name >}} instance. menu: influxdb3_enterprise: - name: Processing Engine and Python plugins + name: Processing engine and Python plugins weight: 4 influxdb3/enterprise/tags: [processing engine, python] related: diff --git a/content/shared/extended-plugin-api.md b/content/shared/extended-plugin-api.md new file mode 100644 index 0000000000..8e65ec07a0 --- /dev/null +++ b/content/shared/extended-plugin-api.md @@ -0,0 +1,313 @@ +The Processing engine includes an API that allows your plugins to +interact with your data, build and write data in line protocol format, and maintain state between executions. +These features let you build plugins that can transform, analyze, and respond to data. + +The plugin API lets you: + +- [Write and query data](#write-and-query-data) +- [Log messages for monitoring and debugging](#log-messages-for-monitoring-and-debugging) +- [Maintain state with in-memory cache](#maintain-state-with-in-memory-cache) +- [Guidelines for in-memory caching](#guidelines-for-in-memory-caching) + +### Get started with the shared API + +Every plugin has access to the shared API through the `influxdb3_local` object. You don't need to import any libraries to use the API. It's available as soon as your plugin runs. + +### Write and query data + +#### Write data + +To write data into your database use the `LineBuilder` API to create line protocol data: + +```python +# Create a line protocol entry +line = LineBuilder("weather") +line.tag("location", "us-midwest") +line.float64_field("temperature", 82.5) +line.time_ns(1627680000000000000) + +# Write the data to the database +influxdb3_local.write(line) +``` + +Your writes are buffered while the plugin runs and are flushed when the plugin completes. + +{{% expand-wrapper %}} +{{% expand "View the `LineBuilder` Python implementation" %}} + +```python +from typing import Optional +from collections import OrderedDict + +class InfluxDBError(Exception): + """Base exception for InfluxDB-related errors""" + pass + +class InvalidMeasurementError(InfluxDBError): + """Raised when measurement name is invalid""" + pass + +class InvalidKeyError(InfluxDBError): + """Raised when a tag or field key is invalid""" + pass + +class InvalidLineError(InfluxDBError): + """Raised when a line protocol string is invalid""" + pass + +class LineBuilder: + def __init__(self, measurement: str): + if ' ' in measurement: + raise InvalidMeasurementError("Measurement name cannot contain spaces") + self.measurement = measurement + self.tags: OrderedDict[str, str] = OrderedDict() + self.fields: OrderedDict[str, str] = OrderedDict() + self._timestamp_ns: Optional[int] = None + + def _validate_key(self, key: str, key_type: str) -> None: + """Validate that a key does not contain spaces, commas, or equals signs.""" + if not key: + raise InvalidKeyError(f"{key_type} key cannot be empty") + if ' ' in key: + raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces") + if ',' in key: + raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas") + if '=' in key: + raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs") + + def tag(self, key: str, value: str) -> 'LineBuilder': + """Add a tag to the line protocol.""" + self._validate_key(key, "tag") + self.tags[key] = str(value) + return self + + def uint64_field(self, key: str, value: int) -> 'LineBuilder': + """Add an unsigned integer field to the line protocol.""" + self._validate_key(key, "field") + if value < 0: + raise ValueError(f"uint64 field '{key}' cannot be negative") + self.fields[key] = f"{value}u" + return self + + def int64_field(self, key: str, value: int) -> 'LineBuilder': + """Add an integer field to the line protocol.""" + self._validate_key(key, "field") + self.fields[key] = f"{value}i" + return self + + def float64_field(self, key: str, value: float) -> 'LineBuilder': + """Add a float field to the line protocol.""" + self._validate_key(key, "field") + # Check if value has no decimal component + self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value) + return self + + def string_field(self, key: str, value: str) -> 'LineBuilder': + """Add a string field to the line protocol.""" + self._validate_key(key, "field") + # Escape quotes and backslashes in string values + escaped_value = value.replace('"', '\\"').replace('\\', '\\\\') + self.fields[key] = f'"{escaped_value}"' + return self + + def bool_field(self, key: str, value: bool) -> 'LineBuilder': + """Add a boolean field to the line protocol.""" + self._validate_key(key, "field") + self.fields[key] = 't' if value else 'f' + return self + + def time_ns(self, timestamp_ns: int) -> 'LineBuilder': + """Set the timestamp in nanoseconds.""" + self._timestamp_ns = timestamp_ns + return self + + def build(self) -> str: + """Build the line protocol string.""" + # Start with measurement name (escape commas only) + line = self.measurement.replace(',', '\\,') + + # Add tags if present + if self.tags: + tags_str = ','.join( + f"{k}={v}" for k, v in self.tags.items() + ) + line += f",{tags_str}" + + # Add fields (required) + if not self.fields: + raise InvalidLineError(f"At least one field is required: {line}") + + fields_str = ','.join( + f"{k}={v}" for k, v in self.fields.items() + ) + line += f" {fields_str}" + + # Add timestamp if present + if self._timestamp_ns is not None: + line += f" {self._timestamp_ns}" + + return line +``` +{{% /expand %}} +{{% /expand-wrapper %}} + +#### Query data + +Your plugins can execute SQL queries and process results directly: + +```python +# Simple query +results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'") + +# Parameterized query for safer execution +params = {"table": "metrics", "threshold": 90} +results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params) +``` + +Query results are a `List` of `Dict[String, Any]`, where each dictionary represents a row with column names as keys and column values as values. + +### Log messages for monitoring and debugging + +The shared API `info`, `warn`, and `error` functions accept multiple arguments, +convert them to strings, and log them as a space-separated message to the database log. + +Add logging to track plugin execution: + +```python +influxdb3_local.info("Starting data processing") +influxdb3_local.warn("Could not process some records") +influxdb3_local.error("Failed to connect to external API") + +# Log structured data +obj_to_log = {"records": 157, "errors": 3} +influxdb3_local.info("Processing complete", obj_to_log) +``` +All log messages are written to the server logs and stored in [system tables](/influxdb3/core/reference/cli/influxdb3/show/system/summary/), where you can query them using SQL. +### Maintain state with in-memory cache + +The Processing engine provides an in-memory cache system that enables plugins to persist and retrieve data between executions. + +You can access the cache through the `cache` property of the shared API: + +```python +# Basic usage pattern +influxdb3_local.cache.METHOD(PARAMETERS) +``` + +`cache` provides the following methods to retrieve and manage cached values: + +| Method | Parameters | Returns | Description | +|--------|------------|---------|-------------| +| `put` | `key` (str): The key to store the value under
`value` (Any): Any Python object to cache
`ttl` (Optional[float], default=None): Time in seconds before expiration
`use_global` (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live | +| `get` | `key` (str): The key to retrieve
`default` (Any, default=None): Value to return if key not found
`use_global` (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found | +| `delete` | `key` (str): The key to delete
`use_global` (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found | + +##### Understanding cache namespaces + +The cache system offers two distinct namespaces: + +| Namespace | Scope | Best For | +| --- | --- | --- | +| **Trigger-specific** (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin | +| **Global** | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins | + +### Common cache operations +Here are some examples of how to use the cache in your plugins + +##### Store and retrieve cached data + +```python +# Store a value +influxdb3_local.cache.put("last_run_time", time.time()) + +# Retrieve a value with a default if not found +last_time = influxdb3_local.cache.get("last_run_time", default=0) + +# Delete a cached value +influxdb3_local.cache.delete("temporary_data") +``` + +##### Store cached data with expiration + +```python +# Cache with a 5-minute TTL (time-to-live) +influxdb3_local.cache.put("api_response", response_data, ttl=300) +``` + +##### Share data across plugins + +```python +# Store in the global namespace +influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True) + +# Retrieve from the global namespace +config = influxdb3_local.cache.get("config", use_global=True) +``` +#### Building a counter + +You can track how many times a plugin has run: + +```python +# Get current counter or default to 0 +counter = influxdb3_local.cache.get("execution_count", default=0) + +# Increment counter +counter += 1 + +# Store the updated value +influxdb3_local.cache.put("execution_count", counter) + +influxdb3_local.info(f"This plugin has run {counter} times") +``` + +### Guidelines for in-memory caching + +To get the most out of the in-memory cache, follow these guidelines: + +- [Use the trigger-specific namespace](#use-the-trigger-specific-namespace) +- [Use TTL appropriately](#use-ttl-appropriately) +- [Cache computation results](#cache-computation-results) +- [Warm the cache](#warm-the-cache) +- [Consider cache limitations](#consider-cache-limitations) + +##### Use the trigger-specific namespace + +The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary. + +##### Use TTL appropriately + +Set realistic expiration times based on how frequently data changes: + +```python +# Cache external API responses for 5 minutes +influxdb3_local.cache.put("weather_data", api_response, ttl=300) +``` + +##### Cache computation results + +Store the results of expensive calculations that need to be utilized frequently: + +```python +# Cache aggregated statistics +influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600) +``` + +##### Warm the cache + +For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data: + +```python +# Check if cache needs to be initialized +if not influxdb3_local.cache.get("lookup_table"): + influxdb3_local.cache.put("lookup_table", load_lookup_data()) +``` + +##### Consider cache limitations + +- **Memory Usage**: Since cache contents are stored in memory, monitor your memory usage when caching large datasets. +- **Server Restarts**: Because the cache is cleared when the server restarts, design your plugins to handle cache initialization (as noted above). +- **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key. + +### Next Steps + +With an understanding of the InfluxDB 3 Shared Plugin API, you're ready to build data processing workflows that can transform, analyze, and respond to your time series data or extend example plugins from the [plugin repo]() on GitHub. \ No newline at end of file diff --git a/content/shared/v3-core-plugins/_index.md b/content/shared/v3-core-plugins/_index.md index 2ab099cd97..8cdc5f8427 100644 --- a/content/shared/v3-core-plugins/_index.md +++ b/content/shared/v3-core-plugins/_index.md @@ -1,23 +1,35 @@ -Use the InfluxDB 3 Processing engine to run Python code directly in your -{{% product-name %}} database to automatically process data and respond to database events. +Extend InfluxDB 3 with custom Python code that responds to database events. The Processing Engine lets you automate workflows, transform data, and create API endpoints directly within your {{% product-name %}}. -The Processing engine is an embedded Python VM that runs inside your InfluxDB 3 database and lets you: +## What is the Processing Engine? -- Process data as it's written to the database -- Run code on a schedule -- Create API endpoints that execute Python code -- Maintain state between executions with an in-memory cache +The Processing Engine is an embedded Python virtual machine that runs inside your InfluxDB 3 database. It executes Python code in response to: -Learn how to create, configure, run, and extend Python plugins that execute when specific events occur. +- **Data writes** - Process and transform data as it enters the database +- **Scheduled events** - Run code at specific intervals or times +- **HTTP requests** - Create custom API endpoints that execute your code + +The engine maintains state between executions using an in-memory cache, allowing you to build stateful applications directly in your database. + +This guide shows you how to set up the Processing Engine, create your first plugin, and configure triggers that execute your code when specific events occur. + +## Before you begin + +Ensure you have: +- A working InfluxDB 3 Core instance +- Access to command line +- Python installed if you're writing your own plugin +- Basic knowledge of the InfluxDB CLI + +Once you have all the prerequisites in place, follow these steps to implement the Processing engine for your data automation needs. 1. [Set up the Processing engine](#set-up-the-processing-engine) 2. [Add a Processing engine plugin](#add-a-processing-engine-plugin) - - [Get example plugins](#get-example-plugins) - - [Create a plugin](#create-a-plugin) + - [Use example plugins](#use-example-plugins) + - [Create a custom plugin](#create-a-custom-plugin) 3. [Create a trigger to run a plugin](#create-a-trigger-to-run-a-plugin) - - [Create a trigger for data writes](#create-a-trigger-for-data-writes) - - [Create a trigger for scheduled events](#create-a-trigger-for-scheduled-events) - - [Create a trigger for HTTP requests](#create-a-trigger-for-http-requests) + - [Understand trigger types](#understand-trigger-types) + - [Create a trigger](#create-a-trigger) + - [Choose a trigger specification](#choose-a-trigger-specification) - [Use community plugins from GitHub](#use-community-plugins-from-github) - [Pass arguments to plugins](#pass-arguments-to-plugins) - [Control trigger execution](#control-trigger-execution) @@ -27,7 +39,7 @@ Learn how to create, configure, run, and extend Python plugins that execute when ## Set up the Processing engine -To enable the Processing engine, start your InfluxDB server with the `--plugin-dir` option: +To enable the Processing engine, start your InfluxDB server with the `--plugin-dir` flag to specify where your plugin files are stored. ```bash influxdb3 serve \ @@ -36,9 +48,12 @@ influxdb3 serve \ --plugin-dir /path/to/plugins ``` -Replace `/path/to/plugins` with the directory where you want to store your Python plugin files. All plugin files must be located in this directory or its subdirectories. - +Replace: +- `` with a unique identifier for your instance +- `` with the type of object store (e.g., file, memory, s3) +- /absolute/path/to/plugins with the path to your plugin directory +Replace `/path/to/plugins` with the directory where you want to store your Python plugin files. All plugin files must be located in this directory or its subdirectories. ### Configure distributed environments @@ -57,43 +72,47 @@ If you're running multiple {{% product-name %}} instances (distributed deploymen > > Configure your plugin directory on the same system as the nodes that run the triggers and plugins. - ## Add a Processing engine plugin -A plugin is a Python file that contains a specific function signature that corresponds to a trigger type. -Plugins: +The plugin directory must exist before you start InfluxDB. -- Receive plugin-specific arguments (such as written data, call time, or an HTTP request) -- Can receive keyword arguments (as `args`) from _trigger arguments_ -- Can access the `influxdb3_local` shared API for writing, querying, and managing state +## Add a Processing engine plugin -Get started using example plugins or create your own: +A plugin is a Python file that contains a specific function signature that corresponds to a trigger type. InfluxData maintains a repository of contributed plugins that you can use as-is or as a starting point for your own plugin. -- [Get example plugins](#get-example-plugins) -- [Create a plugin](#create-a-plugin) +You have two main options for adding plugins to your InfluxDB instance: -### Get example plugins +- [Use example plugins](#use-example-plugins) - Quickest way to get started +- [Create a custom plugin](#create-a-custom-plugin) - For custom functionality -InfluxData maintains a repository of contributed plugins that you can use as-is or as a starting point for your own plugin. +### Use example plugins -#### From local files +The InfluxData team maintains a repository of example plugins you can use immediately: -You can copy example plugins from the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3_plugins) to your local plugin directory: +1. **Browse available plugins**: Visit the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3_plugins) to find examples for: + - **Data transformation**: Process and transform incoming data + - **Alerting**: Send notifications based on data thresholds + - **Aggregation**: Calculate statistics on time series data + - **Integration**: Connect to external services and APIs + - **System monitoring**: Track resource usage and health metrics +2. **Choose how to access plugins**: + +**Option A: Copy plugins to your local directory** + ```bash # Clone the repository git clone https://github.com/influxdata/influxdb3_plugins.git - -# Copy example plugins to your plugin directory -cp -r influxdb3_plugins/examples/wal_plugin/* /path/to/plugins/ + +# Copy a plugin to your configured plugin directory +cp influxdb3_plugins/examples/schedule/system_metrics/system_metrics.py /path/to/plugins/ ``` -#### Directly from GitHub +**Option B: Use plugins directly from GitHub** You can use plugins directly from GitHub without downloading them first by using the `gh:` prefix in the plugin filename: - + ```bash -# Use a plugin directly from GitHub influxdb3 create trigger \ --trigger-spec "every:1m" \ --plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \ @@ -101,26 +120,38 @@ influxdb3 create trigger \ system_metrics ``` -> [!Note] -> #### Find and contribute plugins -> -> The plugins repository includes examples for various use cases: -> -> - **Data transformation**: Process and transform incoming data -> - **Alerting**: Send notifications based on data thresholds -> - **Aggregation**: Calculate statistics on time series data -> - **Integration**: Connect to external services and APIs -> - **System monitoring**: Track resource usage and health metrics -> -> Visit [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins) -> to browse available plugins or contribute your own. +Plugins have various functions such as: + +- Receive plugin-specific arguments (such as written data, call time, or an HTTP request) +- Can receive keyword arguments (as `args`) from _trigger arguments_ +- Can access the `influxdb3_local` shared API for writing, querying, and managing state + +### Create a custom plugin + +When you need custom functionality, you can create your own plugin by doing the following: -### Create a plugin +#### Step 1: Choose your plugin type + +First, determine which type of plugin you need based on your automation goals: + +| Plugin Type | Best For | Trigger Type | +|-------------|----------|-------------| +| **Data write** | Processing data as it arrives | `table:` or `all_tables` | +| **Scheduled** | Running code at specific times | `every:` or `cron:` | +| **HTTP request** | Creating API endpoints | `path:` | + +#### Step 2: Create your plugin file 1. Create a `.py` file in your plugins directory -2. Define a function with one of the following signatures: +2. Add the appropriate function signature based on your chosen plugin type +3. Implement your processing logic inside the function + +##### Option A: Create a data write plugin -#### For data write events +Data write plugins process incoming data as it's written to the database. They're ideal for: +- Data transformation and enrichment +- Alerting on incoming values +- Creating derived metrics ```python def process_writes(influxdb3_local, table_batches, args=None): @@ -139,7 +170,13 @@ def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.write(line) ``` -#### For scheduled events +##### Option B: Create a scheduled plugin + +Scheduled plugins run at specific intervals or times. They can be used for: + +- Periodic data aggregation +- Report generation +- System health checks ```python def process_scheduled_call(influxdb3_local, call_time, args=None): @@ -155,7 +192,13 @@ def process_scheduled_call(influxdb3_local, call_time, args=None): influxdb3_local.warn("No recent metrics found") ``` -#### For HTTP requests +##### Option C: Create an HTTP request plugin + +HTTP request plugins respond to API calls. They can be used for: + +- Creating custom API endpoints +- Web hooks for external integrations +- User interfaces for data interaction ```python def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): @@ -174,7 +217,12 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_ return {"status": "success", "message": "Request processed"} ``` -After adding your plugin, you can [install Python dependencies](#install-python-dependencies) or learn how to [extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management). +#### Step 3: Next Steps + +After adding your plugin: +- You can [install Python dependencies](#install-python-dependencies) +- Learn how to [extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management) +- Create a trigger to connect your plugin to database events ## Create a trigger to run a plugin @@ -182,17 +230,36 @@ A trigger connects your plugin to a specific database event. The plugin function signature in your plugin file determines which _trigger specification_ you can choose for configuring and activating your plugin. -Create a trigger with the `influxdb3 create trigger` command. +After setting up your plugin, you need to connect it to specific database events using triggers. + +### Understand trigger types + +| Plugin Type | Trigger Specification | When Plugin Runs | +|------------|----------------------|-----------------| +| Data write | `table:` or `all_tables` | When data is written to tables | +| Scheduled | `every:` or `cron:` | At specified time intervals | +| HTTP request | `path:` | When HTTP requests are received | + +### Create a trigger + +Use the `influxdb3 create trigger` command with the appropriate trigger specification: + +```bash +influxdb3 create trigger \ + --trigger-spec "" \ + --plugin-filename "" \ + --database \ + + ``` > [!Note] > When specifying a local plugin file, the `--plugin-filename` parameter > _is relative to_ the `--plugin-dir` configured for the server. > You don't need to provide an absolute path. -### Create a trigger for data writes +### Choose a trigger specification -Use the `table:` or the `all_tables` trigger specification to configure -and run a [plugin for data write events](#for-data-write-events)--for example: +#### Option A: For data write events ```bash # Trigger on writes to a specific table @@ -216,10 +283,7 @@ to the Write-Ahead Log (WAL) in the Object store (default is every second). The plugin receives the written data and table information. -### Create a trigger for scheduled events - -Use the `every:` or the `cron:` trigger specification -to configure and run a [plugin for scheduled events](#for-scheduled-events)--for example: +#### Option B: For scheduled events ```bash # Run every 5 minutes @@ -239,9 +303,7 @@ influxdb3 create trigger \ The plugin receives the scheduled call time. -### Create a trigger for HTTP requests - -For an [HTTP request plugin](#for-http-requests), use the `request:` trigger specification to configure and enable a [plugin for HTTP requests](#for-http-requests)--for example: +#### Option C: For HTTP requests ```bash # Create an endpoint at /api/v3/engine/webhook @@ -252,7 +314,7 @@ influxdb3 create trigger \ webhook_processor ``` -The trigger makes your endpoint available at `/api/v3/engine/`. +Access your endpoint available at `/api/v3/engine/`. To run the plugin, send a `GET` or `POST` request to the endpoint--for example: ```bash @@ -344,299 +406,88 @@ influxdb3 create trigger \ auto_disable_processor ``` -## Extend plugins with API features and state management - -The Processing engine includes API capabilities that allow your plugins to -interact with InfluxDB data and maintain state between executions. -These features let you build more sophisticated plugins that can transform, analyze, and respond to data. - -### Use the shared API - -All plugins have access to the shared API to interact with the database. +## Advanced trigger configuration -#### Write data - -Use the `LineBuilder` API to create line protocol data: - -```python -# Create a line protocol entry -line = LineBuilder("weather") -line.tag("location", "us-midwest") -line.float64_field("temperature", 82.5) -line.time_ns(1627680000000000000) - -# Write the data to the database -influxdb3_local.write(line) -``` +After creating basic triggers, you can enhance your plugins with these advanced features: -Writes are buffered while the plugin runs and are flushed when the plugin completes. +### Step 1: Access community plugins from GitHub -{{% expand-wrapper %}} -{{% expand "View the `LineBuilder` Python implementation" %}} +Skip downloading plugins by referencing them directly from GitHub: -```python -from typing import Optional -from collections import OrderedDict - -class InfluxDBError(Exception): - """Base exception for InfluxDB-related errors""" - pass - -class InvalidMeasurementError(InfluxDBError): - """Raised when measurement name is invalid""" - pass - -class InvalidKeyError(InfluxDBError): - """Raised when a tag or field key is invalid""" - pass - -class InvalidLineError(InfluxDBError): - """Raised when a line protocol string is invalid""" - pass - -class LineBuilder: - def __init__(self, measurement: str): - if ' ' in measurement: - raise InvalidMeasurementError("Measurement name cannot contain spaces") - self.measurement = measurement - self.tags: OrderedDict[str, str] = OrderedDict() - self.fields: OrderedDict[str, str] = OrderedDict() - self._timestamp_ns: Optional[int] = None - - def _validate_key(self, key: str, key_type: str) -> None: - """Validate that a key does not contain spaces, commas, or equals signs.""" - if not key: - raise InvalidKeyError(f"{key_type} key cannot be empty") - if ' ' in key: - raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces") - if ',' in key: - raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas") - if '=' in key: - raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs") - - def tag(self, key: str, value: str) -> 'LineBuilder': - """Add a tag to the line protocol.""" - self._validate_key(key, "tag") - self.tags[key] = str(value) - return self - - def uint64_field(self, key: str, value: int) -> 'LineBuilder': - """Add an unsigned integer field to the line protocol.""" - self._validate_key(key, "field") - if value < 0: - raise ValueError(f"uint64 field '{key}' cannot be negative") - self.fields[key] = f"{value}u" - return self - - def int64_field(self, key: str, value: int) -> 'LineBuilder': - """Add an integer field to the line protocol.""" - self._validate_key(key, "field") - self.fields[key] = f"{value}i" - return self - - def float64_field(self, key: str, value: float) -> 'LineBuilder': - """Add a float field to the line protocol.""" - self._validate_key(key, "field") - # Check if value has no decimal component - self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value) - return self - - def string_field(self, key: str, value: str) -> 'LineBuilder': - """Add a string field to the line protocol.""" - self._validate_key(key, "field") - # Escape quotes and backslashes in string values - escaped_value = value.replace('"', '\\"').replace('\\', '\\\\') - self.fields[key] = f'"{escaped_value}"' - return self - - def bool_field(self, key: str, value: bool) -> 'LineBuilder': - """Add a boolean field to the line protocol.""" - self._validate_key(key, "field") - self.fields[key] = 't' if value else 'f' - return self - - def time_ns(self, timestamp_ns: int) -> 'LineBuilder': - """Set the timestamp in nanoseconds.""" - self._timestamp_ns = timestamp_ns - return self - - def build(self) -> str: - """Build the line protocol string.""" - # Start with measurement name (escape commas only) - line = self.measurement.replace(',', '\\,') - - # Add tags if present - if self.tags: - tags_str = ','.join( - f"{k}={v}" for k, v in self.tags.items() - ) - line += f",{tags_str}" - - # Add fields (required) - if not self.fields: - raise InvalidLineError(f"At least one field is required: {line}") - - fields_str = ','.join( - f"{k}={v}" for k, v in self.fields.items() - ) - line += f" {fields_str}" - - # Add timestamp if present - if self._timestamp_ns is not None: - line += f" {self._timestamp_ns}" - - return line -``` -{{% /expand %}} -{{% /expand-wrapper %}} - -#### Query data - -Execute SQL queries and get results: - -```python -# Simple query -results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'") - -# Parameterized query for safer execution -params = {"table": "metrics", "threshold": 90} -results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params) -``` - -The shared API `query` function returns results as a `List` of `Dict[String, Any]`, where the key is the column name and the value is the column value. - -#### Log information - -The shared API `info`, `warn`, and `error` functions accept multiple arguments, -convert them to strings, and log them as a space-separated message to the database log, -which is output in the server logs and captured in system tables that you can -query using SQL. - -Add logging to track plugin execution: - -```python -influxdb3_local.info("Starting data processing") -influxdb3_local.warn("Could not process some records") -influxdb3_local.error("Failed to connect to external API") - -# Log structured data -obj_to_log = {"records": 157, "errors": 3} -influxdb3_local.info("Processing complete", obj_to_log) -``` - -#### Use the in-memory cache - -The Processing engine provides an in-memory cache system that enables plugins to persist and retrieve data between executions. - -Use the shared API `cache` property to access the cache API. - -```python -# Basic usage pattern -influxdb3_local.cache.METHOD(PARAMETERS) +```bash +# Create a trigger using a plugin from GitHub +influxdb3 create trigger \ + --trigger-spec "every:1m" \ + --plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \ + --database my_database \ + system_metrics ``` -| Method | Parameters | Returns | Description | -|--------|------------|---------|-------------| -| `put` | `key` (str): The key to store the value under
`value` (Any): Any Python object to cache
`ttl` (Optional[float], default=None): Time in seconds before expiration
`use_global` (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live | -| `get` | `key` (str): The key to retrieve
`default` (Any, default=None): Value to return if key not found
`use_global` (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found | -| `delete` | `key` (str): The key to delete
`use_global` (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found | - -##### Cache namespaces - -The cache system offers two distinct namespaces: +This approach: -| Namespace | Scope | Best For | -| --- | --- | --- | -| **Trigger-specific** (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin | -| **Global** | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins | +- Ensures you're using the latest version +- Simplifies updates and maintenance +- Reduces local storage requirements -##### Store and retrieve cached data +### Step 2: Configure your triggers -```python -# Store a value -influxdb3_local.cache.put("last_run_time", time.time()) - -# Retrieve a value with a default if not found -last_time = influxdb3_local.cache.get("last_run_time", default=0) - -# Delete a cached value -influxdb3_local.cache.delete("temporary_data") -``` +#### Pass configuration arguments -##### Store cached data with expiration +Provide runtine configuration to your plugins: -```python -# Cache with a 5-minute TTL (time-to-live) -influxdb3_local.cache.put("api_response", response_data, ttl=300) +```bash +# Pass threshold and email settings to a plugin +influxdb3 create trigger \ + --trigger-spec "every:1h" \ + --plugin-filename "threshold_check.py" \ + --trigger-arguments threshold=90,notify_email=admin@example.com \ + --database my_database \ + threshold_monitor ``` - -##### Share data across plugins +Your plugin accesses these values through the `args` parameter: ```python -# Store in the global namespace -influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True) - -# Retrieve from the global namespace -config = influxdb3_local.cache.get("config", use_global=True) +def process_scheduled_call(influxdb3_local, call_time, args=None): + if args and "threshold" in args: + threshold = float(args["threshold"]) + email = args.get("notify_email", "default@example.com") + + # Use the arguments in your logic + influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}") ``` +#### Set execution mode -##### Track state between executions - -```python -# Get current counter or default to 0 -counter = influxdb3_local.cache.get("execution_count", default=0) - -# Increment counter -counter += 1 - -# Store the updated value -influxdb3_local.cache.put("execution_count", counter) +Choose between synchronous (default) or asynchronous execution: -influxdb3_local.info(f"This plugin has run {counter} times") +```bash +# Allow multiple trigger instances to run simultaneously +influxdb3 create trigger \ + --trigger-spec "table:metrics" \ + --plugin-filename "heavy_process.py" \ + --run-asynchronous \ + --database my_database \ + async_processor ``` -#### Best practices for in-memory caching +Use asynchronous execution when: -- [Use the trigger-specific namespace](#use-the-trigger-specific-namespace) -- [Use TTL appropriately](#use-ttl-appropriately) -- [Cache computation results](#cache-computation-results) -- [Warm the cache](#warm-the-cache) -- [Consider cache limitations](#consider-cache-limitations) +- Processing might take longer than the trigger interval +- Multiple events need to be handled simultaneously +- Performance is more important than sequential execution -##### Use the trigger-specific namespace +#### Configure error handling -The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary. - -##### Use TTL appropriately -Set realistic expiration times based on how frequently data changes. - -```python -# Cache external API responses for 5 minutes -influxdb3_local.cache.put("weather_data", api_response, ttl=300) -``` - -##### Cache computation results -Store the results of expensive calculations that need to be utilized frequently. -```python -# Cache aggregated statistics -influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600) -``` - -##### Warm the cache -For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data. - -```python -# Check if cache needs to be initialized -if not influxdb3_local.cache.get("lookup_table"): - influxdb3_local.cache.put("lookup_table", load_lookup_data()) +Control how your trigger responds to errors: +```bash +# Automatically retry on error +influxdb3 create trigger \ + --trigger-spec "table:important_data" \ + --plugin-filename "critical_process.py" \ + --error-behavior retry \ + --database my_database \ + critical_processor ``` -##### Consider cache limitations - -- **Memory Usage**: Since cache contents are stored in memory, monitor your memory usage when caching large datasets. -- **Server Restarts**: Because the cache is cleared when the server restarts, design your plugins to handle cache initialization (as noted above). -- **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key. - ## Install Python dependencies If your plugin needs additional Python packages, use the `influxdb3 install` command: