A Celery utility for dynamically extending AWS SQS message visibility timeouts for long-running tasks.
🛡️ Perfect Pairing: If you are running your Celery workers on AWS ECS, we highly recommend using this library alongside ECS Task Shield to protect your long-running containers from auto-scaling scale-in events!
AWS Batch is a powerful tool for heavy processing, but choosing an auto-scaled Celery worker pool where you explicitly manage your own desired task counts (running on ECS, EKS, or other container orchestrators) backed by SQS offers several key advantages for specific workloads:
- Warm Pools vs. Cold Starts: Celery worker pools can be kept warm. If a user triggers an action in your app, an active Celery worker starts processing in milliseconds. AWS Batch can take minutes to schedule a job and provision a cold container, meaning the compute overhead of starting and stopping the container can actually exceed the runtime of shorter tasks.
- Heterogeneous Workloads: It is significantly simpler to maintain one unified Celery/ECS architecture that handles both 5-second tasks and 4-hour tasks, rather than splitting your infrastructure between Celery (for fast tasks) and AWS Batch (for slow tasks).
- Granular Retries & DLQ Integration: Celery and SQS provide powerful, code-level retry capabilities (like exponential backoff) and native Dead Letter Queue (DLQ) integration. In contrast, AWS Batch retries require re-running the entire container lifecycle from scratch, which is slow and resource-intensive for transient errors.
- Local Disk & Memory Caching: AWS Batch provisions a cold, empty container for every job. With Celery, your worker can download a massive ML model or dataset once on boot and keep it cached on disk or in memory for the next 1,000 inferences.
- API & Worker Synergy: You can reuse the exact same Docker images, IAM roles, environment variables, and Terraform Task Definitions between your synchronous API and your asynchronous Celery workers.
- Local Developer Experience: Celery and SQS (via LocalStack) are incredibly easy to run and test locally via
docker-compose.
This architectural pattern—and the SQS Extender—shine in scenarios with highly variable execution times, such as:
- Video & Audio Transcoding: One task takes 5 seconds (small clip), another takes 6 hours (4K movie).
- Large-Scale Data ETL: Processing columnar data uploads where row counts vary wildly from 10 rows to 10,000,000 rows.
- AI/ML Batch Inference: Where processing time depends heavily on input size, sequence length, or batch size.
When using AWS SQS as a Celery broker, you typically set a global visibility_timeout (e.g., 2 hours). If a worker
picks up a task, SQS hides that message from other workers for 2 hours.
However, if you have highly variable task durations—where one job takes 5 minutes and another takes 4 hours—this global timeout creates a massive problem: If a worker takes 4 hours to process a job, the SQS visibility timeout will expire halfway through. SQS will assume the worker died, and it will deliver the exact same message to another worker. Now you have duplicate processing, race conditions, and wasted compute.
This package solves this by allowing your tasks to dynamically request more time directly from AWS SQS while they are executing. It intercepts Celery's Kombu message payload, extracts the AWS ReceiptHandle, and manages the AWS API calls safely and concurrently.
Features
- Zero-Pollution Payload: Extracts the SQS ReceiptHandle directly from Kombu's transport layer. You don't need to change your Celery task signatures or payloads.
- Thread-Safe Boto3: Safely lazy-loads boto3 clients and caches QueueUrl resolutions to prevent blocking or crashing gevent / threading worker pools.
- Graceful Fallbacks: Gracefully swallows AWS errors related to already-expired messages to ensure successful tasks are not abruptly killed at the finish line.
For this library to function, you must configure your Celery tasks to use Late Acknowledgment.
Unlike "Early Ack" (the default), which deletes messages upon pickup, Late Ack keeps the message "in-flight" until the
task finishes. This is technically necessary because once a message is deleted, its ReceiptHandle is invalidated and
its visibility cannot be extended. Furthermore, Late Ack provides crucial data safety: if a worker crashes or OOMs
mid-task, the message isn't lost—it will eventually reappear in the queue or DLQ for re-processing.
You can enable this globally in your Celery configuration or on a per-task basis:
# Option 1: Global configuration
task_acks_late = True
# Option 2: Per-task definition
@app.task(acks_late=True)
def my_long_running_task():
...To allow your workers to extend SQS visibility, the IAM role associated with your application (e.g., the ECS Task Role) must have the following permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["sqs:GetQueueUrl", "sqs:ChangeMessageVisibility"],
"Resource": ["arn:aws:sqs:region:account:queue-name"]
}
]
}Note: Replace region, account, and queue-name with your actual values. If your tasks are distributed across multiple
queues, you can use a wildcard (e.g., arn:aws:sqs:us-east-1:123456789012:*).
Simply wrap your long-running Celery tasks with the @manage_sqs_visibility decorator.
from celery_sqs_extender import Injected, SqsVisibilityContext, manage_sqs_visibility
@celery_app.task()
@manage_sqs_visibility()
def process_massive_video_file(
video_id: str,
# Injected() satisfies static type checkers (like mypy/pyright) by providing a
# mock default, since the decorator injects the real context at runtime.
sqs_ctx: SqsVisibilityContext = Injected()
):
video = db.get_video(video_id)
for chunk in video.chunks:
# Transcode the chunk (takes a variable amount of time)
transcode_chunk(chunk)
# We are still alive! Request 1 more hour of processing time from SQS
# to ensure no other worker picks up this task.
sqs_ctx.extend(dt.timedelta(hours=1))If you use LocalStack for local development (like via docker-compose), you can pass your local endpoint URL directly into the decorator.
import os
from celery_sqs_extender import Injected, SqsVisibilityContext, manage_sqs_visibility
LOCALSTACK_URL = os.getenv("SQS_ENDPOINT_URL") # e.g., http://localstack:4566
@celery_app.task()
@manage_sqs_visibility(endpoint_url=LOCALSTACK_URL)
def my_task(sqs_ctx: SqsVisibilityContext = Injected()):
passBy default, the decorator assumes your Celery routing_key exactly matches your AWS SQS Queue name. If you are using complex exchange topics, fanout architectures, or prefixed queue names, you can explicitly override the SQS queue name.
from celery_sqs_extender import Injected, SqsVisibilityContext, manage_sqs_visibility
@celery_app.task(queue="video.transcode.high")
@manage_sqs_visibility(queue_name="prod-video-transcode-queue")
def process_video(sqs_ctx: SqsVisibilityContext = Injected()):
sqs_ctx.extend(dt.timedelta(hours=1))