Skip to content

postgres(minor): async query support via dblink + MongoDB job tracking#1007

Open
salatv-ai wants to merge 9 commits into
Appmixer-ai:devfrom
salatv-ai:977
Open

postgres(minor): async query support via dblink + MongoDB job tracking#1007
salatv-ai wants to merge 9 commits into
Appmixer-ai:devfrom
salatv-ai:977

Conversation

@salatv-ai

@salatv-ai salatv-ai commented Mar 10, 2026

Copy link
Copy Markdown
Contributor

Async Query Support via dblink + MongoDB Job Tracking

Overview

Adds a non-blocking async execution path to the PostgreSQL Query component. Long-running queries are dispatched via dblink_send_query and tracked in MongoDB — allowing the Appmixer flow to return immediately and deliver results later without occupying a worker thread.


Architecture

Async Execution Flow

asyncMode=true
  → Query.js: insertOne(pgAsyncJobs) + dblink_send_query()
  → return immediately (no blocking)

jobs.js (poll every 5s):
  → findOne({ status: "pending" })
  → dblink_is_busy() → still running? skip
  → done → dblink_get_result() → updateOne({ status: "done", result })
  → triggerComponent(flowId, componentId, _asyncJobId)

Query.js receive(_asyncJobId):
  → findOne(pgAsyncJobs, { _asyncJobId })
  → sendJson(out)

Key Components

File Purpose
AsyncJobModel.js MongoDB model for pgAsyncJobs collection; TTL index for automatic cleanup
connections.js Process-global pg.Client pool for persistent dblink sessions; survives require() cache clears (same pattern as Kafka connector)
jobs.js Background poller (5s) + cleanup job (10min); distributed locks prevent duplicate processing across cluster nodes
config.js Configurable polling intervals and TTL thresholds
plugin.js Plugin entry point — registers background jobs on connector load
Query.js Unified component: sync path unchanged; async path added as opt-in via asyncMode toggle

Why dblink?

PostgreSQL's dblink extension allows one connection to fire a query and another to poll its status — decoupling execution from result delivery. This avoids long-held connections and worker starvation on slow queries.

Why MongoDB for job state?

Appmixer already uses MongoDB as its operational store. Storing async job state there (vs. Redis or in-memory) gives us:

  • Persistence across restarts
  • Cluster-safe — any node can pick up a job
  • TTL cleanup — old jobs expire automatically

Sync Path

Unchanged. asyncMode=false (default) goes through the existing execution path with zero overhead.

vtalas added 2 commits March 11, 2026 16:36
- Add AsyncJobModel.js: MongoDB model for pgAsyncJobs collection with TTL cleanup
- Add connections.js: process-global pg.Client management for dblink sessions
  (same pattern as Kafka connector, survives require() cache clears)
- Add jobs.js: polling job (every 5s) + cleanup job (every 10min), both with
  distributed locks to prevent duplicate processing across cluster nodes
- Add config.js: configurable schedules and thresholds
- Add plugin.js: plugin entry point registering background jobs
- Update Query.js: async mode path (startAsyncQuery + deliverAsyncResult),
  sync path unchanged; results delivered via triggerComponent from jobs.js
- Update Query component.json: asyncMode toggle input (index 3)
- Bump bundle.json to 2.2.0 with changelog

Flow:
  asyncMode=true → insertOne(pgAsyncJobs) → dblink_send_query → return
  jobs.js polls → dblink_is_busy → done → updateOne(result) → triggerComponent
  receive(_asyncJobId) → findOne(pgAsyncJobs) → sendJson(out)

Closes: part of appmixer-connectors#977
- test-flow-crud.json: full CRUD lifecycle (CreateRow/Query/UpdateRow/DeleteRow)
- test-flow-query.json: Query outputTypes - rows, row, file
- test-flow-schema.json: ListTables + ListColumns
- test-flow-async-query.json: asyncMode (dblink) end-to-end (AfterAll timeout 120s)

Part of Appmixer-ai#977
- Replace double quotes with single quotes
- Rename _row to jsonRow to avoid no-underscore-dangle
- Use named catch variable instead of _
@salatv-ai

Copy link
Copy Markdown
Contributor Author

Code Review — Postgres Async Query (PR #1007)

✅ What looks good

  • Architecture: dblink-based async execution is a solid approach for long-running PG queries. Clean separation: connections.js (in-memory connection tracking), AsyncJobModel.js (Mongo persistence), jobs.js (polling/cleanup), plugin.js (init).
  • Resilience: Good recovery pattern — if a node restarts, jobs.js detects orphaned jobs (no local connection) and restarts the query. Stuck job cleanup with configurable threshold is nice.
  • Distributed locking: Both poll and cleanup jobs use locks to avoid concurrent execution across cluster nodes.
  • TTL index on updatedAt for auto-cleanup of completed/errored jobs.
  • Sync mode preserved: Existing sync behavior is untouched (just restructured for readability).
  • Bug fix: messagesmessage typo in emptyResult output — good catch.

⚠️ Issues / Suggestions

1. Security: auth credentials stored in MongoDB
startAsyncQuery() persists context.auth (including dbPassword) directly into the pgAsyncJobs collection. This means plaintext PG passwords sit in Mongo. Consider:

  • Encrypting sensitive fields before storage
  • Or not persisting auth at all — instead, re-fetch it from the flow/component context during recovery

2. buildConnStr is vulnerable to injection
The buildConnStr() function concatenates auth values directly into a libpq connection string without escaping. If dbHost, dbUser, or dbPassword contain spaces or special characters, the connection string breaks or worse. Consider using pg-connection-string or at minimum quoting the values: '${value}'.

3. Missing query validation in async mode
Async mode wraps the user query in SELECT row_to_json(t)::text AS _row FROM (${query}) t — this is direct string interpolation. While the sync mode uses QueryStream (parameterized), the async path does not. This is a regression from the SQL injection fix in v2.1.0.

4. _asyncJobId and _asyncError in component.json schema
These fields are used internally by triggerComponent() but are not declared in component.json schema. While this works, it means:

  • They could be set by a user in the UI (unintended behavior)
  • Consider adding them as hidden/private fields or checking the source of the message

5. Plugin requires dblink extension
CREATE EXTENSION IF NOT EXISTS dblink requires superuser or CREATE privilege on the database. The tooltip mentions this, but the error message when it fails would be cryptic. Consider catching and providing a clearer error.

6. process.PG_ASYNC_CONNECTIONS global
Attaching to process object is fragile — it survives require cache clears but is unconventional. Comment mentions Kafka connector does the same, so this might be an established pattern in the codebase. Just flagging it.

7. Test flows
Four test flow files added — great coverage.

🔧 ESLint fix applied

Fixed 3 ESLint errors in connections.js:

  • Double quotes → single quotes
  • _rowjsonRow (no-underscore-dangle)
  • Unnamed catch _e

Summary

Overall solid implementation. The main concerns are security (plaintext credentials in Mongo, connection string injection) and SQL injection in async mode. The architecture and recovery patterns are well-thought-out.

Salat added 6 commits March 13, 2026 12:31
…ty result test

- Added sync query step (regression check that sync mode still works)
- Added async empty result test (emptyResult port)
- All 3 assert branches connected to AfterAll
- Timeout increased to 120s for async polling delay
AsyncJobModel was extending context.db.Model which doesn't exist in
the Appmixer component context. Since all CRUD operations already use
context.db.collection() directly, the Model base class was unnecessary.

Simplified AsyncJobModel to a plain object exporting only the collection
name constant.

Fixes TypeError: Cannot read properties of undefined (reading 'Model')
Component context doesn't have context.db — only service-level
(plugin/jobs/routes) does. Refactored async query architecture:

- Query.js now registers jobs via context.service.stateAddToSet()
  instead of writing to MongoDB directly
- jobs.js syncs pending jobs from service state to MongoDB on each poll
- Result rows are delivered inline via triggerComponent payload
  (_asyncRows) instead of component reading from MongoDB

This fixes TypeError: Cannot read properties of undefined (reading
'collection') in component context.
triggerComponent delivers data through context.messages.webhook, not
context.messages.in. The component was crashing because
context.messages.in was undefined when called back by jobs.js.

Check for webhook message first before accessing input port.
… errors

- Check both webhook.content.data, webhook.content and in.content for
  async callback data from triggerComponent (covers all Appmixer versions)
- Also check in port for async data in case triggerComponent routes there
- Rename _asyncJobId/_asyncRows/_asyncError to drop leading underscore
  (eslint no-underscore-dangle rule)
…ping

The user query gets wrapped in a subquery for dblink:
SELECT row_to_json(t)::text FROM (<query>) t

A trailing semicolon in the user query causes a syntax error inside
the subquery parentheses.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants