-
Notifications
You must be signed in to change notification settings - Fork 917
Open
Description
Hi! I’m experimenting with the new Redpanda Connect Plugins framework. During pipeline shutdown, I see the following ERROR logs:
INFO[2026-01-23T07:44:07Z] Running main config from specified file @service=redpanda-connect benthos_version=4.78.0 path=connect.yaml
INFO[2026-01-23T07:44:07Z] Successfully loaded Redpanda license @service=redpanda-connect expires_at="2036-01-21T07:44:07Z" license_org="" license_type=open_source
INFO[2026-01-23T07:44:07Z] Listening for HTTP requests at: http://0.0.0.0:4195 @service=redpanda-connect
INFO[2026-01-23T07:44:08Z] Launching a Redpanda Connect instance, use CTRL+C to close @service=redpanda-connect
INFO[2026-01-23T07:44:08Z] Output type file is now active @service=redpanda-connect label="" path=root.output
INFO[2026-01-23T07:44:08Z] Input type generate is now active @service=redpanda-connect label="" path=root.input
INFO[2026-01-23T07:44:09Z] ERROR:grpc._cython.cygrpc:Exception not handled by _handle_exceptions in servicer method [/redpanda.runtime.v1alpha1.BatchProcessorService/Close] @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stderr
INFO[2026-01-23T07:44:09Z] Traceback (most recent call last): @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stderr
INFO[2026-01-23T07:44:09Z] File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 787, in grpc._cython.cygrpc._schedule_rpc_coro @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stderr
INFO[2026-01-23T07:44:09Z] asyncio.exceptions.CancelledError @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stderr
INFO[2026-01-23T07:44:09Z] Traceback (most recent call last): @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stderr
INFO[2026-01-23T07:44:09Z] File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 787, in grpc._cython.cygrpc._schedule_rpc_coro @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stderr
INFO[2026-01-23T07:44:09Z] asyncio.exceptions.CancelledError @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stderr
INFO[2026-01-23T07:44:09Z] Successfully loaded Redpanda Connect RPC plugin @service=redpanda-connect label="" path=root.pipeline.processors.0 source=stdout
INFO[2026-01-23T07:44:09Z] Pipeline has terminated. Shutting down the service @service=redpanda-connect
The pipeline itself works as expected. Is this expected behavior, and is there a way to avoid these ERROR logs during shutdown?
Here’s the setup:
connect.yaml
input:
generate:
mapping: |
root = {
"message": fake("sentence")
}
interval: ""
count: 1000
pipeline:
processors:
- yell: {}
output:
file:
codec: lines
path: ./processed/output.jsonlplugin.yaml
name: yell
summary: yells back what you give it
command: ["uv", "run", "--no-sync", "--frozen", "main.py"]
type: processor
fields: []main.py
import asyncio
import logging
from datetime import datetime, timezone
from redpanda_connect import Message, processor, processor_main
logging.getLogger("grpc").setLevel(logging.ERROR)
logging.getLogger("grpc._cython.cygrpc").setLevel(logging.ERROR)
logging.getLogger("redpanda_connect._grpc").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
@processor
def yell(msg: Message) -> Message:
try:
data = msg.payload
data["message"] = data["message"].upper()
data["yelled_at"] = datetime.now(timezone.utc)
return msg
except Exception as err:
logger.error(f"Error processing message: {err}", exc_info=True)
return msg
if __name__ == "__main__":
asyncio.run(processor_main(yell))Metadata
Metadata
Assignees
Labels
No labels