- Consuming
- Publishing
- Relay Mode
- Continuously relay messages from your RabbitMQ instance to a Batch.sh collection
- Continuously relay messages from an SQS queue to a Batch.sh collection
- Continuously relay messages from an Azure queue to a Batch.sh collection
- Continuously relay messages from an Azure topic to a Batch.sh collection
- Continuously relay messages for multiple Redis channels to a Batch.sh collection
- Continuously relay messages for multiple Redis streams to a Batch.sh collection
- Continuously relay messages from a Kafka topic (on Confluent) to a Batch.sh collection (via CLI)
- Advanced Usage
Read X number of messages
plumber read aws-sqs --queue-name=orders --max-num-messages=10
Read messages and delete them afterwards
plumber read aws-sqs --queue-name=orders --max-num-messages=10 --auto-delete
Continuously read messages
plumber read aws-sqs --queue-name=orders --follow
Poll for new messages for X seconds
plumber read aws-sqs --queue-name=orders --wait-time-seconds=20
plumber read rabbit
--address="amqp://localhost:5672" \
--exchange=testex \
--queue=testqueue \
--routing-key="orders.#"
--follow
Read a single message
plumber read kafka --topic orders --address="some-machine.domain.com:9092" --line-numbers --follow
Continuously read messages
plumber read kafka --topic orders --address="some-machine.domain.com:9092" --follow
Reading from a topic
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber read azure --topic="new-orders" --subscription="copy-of-new-orders"
Reading from a queue
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber read azure --queue "new-orders"
plumber read nats --address="nats://user:[email protected]:4222" --subject "test-subject"
plumber read redis-pubsub --address="localhost:6379" --channels="new-orders"
plumber read redis-streams --address="localhost:6379" --streams="new-orders"
plumber read gcp-pubsub --project-id=PROJECT_ID --sub-id=SUBSCRIPTION
plumber write aws-sqs --queue-name=NewOrders --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write rabbit --address="aqmp://rabbit.yourdomain.net:5672" --exchange=NewOrders --routing-key="orders.oregon.coffee" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write kafka --address="localhost:9092" --topic=neworders --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write aws-sns --topic="arn:aws:sns:us-east-2:123456789012:MyTopic" --input-data="A new is ready for processing!"
Publishing to a topic
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber write azure --topic="new-orders" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Publishing to a queue
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber write azure --queue="new-orders" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write nats --address="nats://user:[email protected]:4222" --subject "test-subject" --input-data "Hello World"
plumber write redis-pubsub --address="localhost:6379" --channels="new-orders" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write redis-streams --address="localhost:6379" --streams="new-orders" --key foo --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write gcp-pubsub --topic-id=TOPIC --project-id=PROJECT_ID --input-data='{"Sensor":"Room J","Temp":19}'
$ docker run --name plumber-rabbit -p 8080:8080 \
-e PLUMBER_RELAY_TYPE=rabbit \
-e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
-e PLUMBER_RELAY_RABBIT_EXCHANGE=my_exchange \
-e PLUMBER_RELAY_RABBIT_QUEUE=my_queue \
-e PLUMBER_RELAY_RABBIT_ROUTING_KEY=some.routing.key \
-e PLUMBER_RELAY_RABBIT_QUEUE_EXCLUSIVE=false \
-e PLUMBER_RELAY_RABBIT_QUEUE_DURABLE=true \
batchcorp/plumber
docker run -d --name plumber-sqs -p 8080:8080 \
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
-e PLUMBER_RELAY_SQS_QUEUE_NAME=TestQueue \
-e PLUMBER_RELAY_TYPE=aws-sqs \
-e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
batchcorp/plumber
docker run -d --name plumber-azure -p 8080:8080 \
-e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
-e PLUMBER_RELAY_AZURE_QUEUE_NAME=neworders \
-e PLUMBER_RELAY_TYPE=azure \
-e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
batchcorp/plumber
docker run -d --name plumber-azure -p 8080:8080 \
-e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
-e PLUMBER_RELAY_AZURE_TOPIC_NAME=neworders \
-e PLUMBER_RELAY_AZURE_SUBSCRIPTION=some-sub \
-e PLUMBER_RELAY_TYPE=azure \
-e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
batchcorp/plumber
docker run -d --name plumber-redis-pubsub -p 8080:8080 \
-e PLUMBER_RELAY_REDIS_PUBSUB_ADDRESS=localhost:6379 \
-e PLUMBER_RELAY_REDIS_PUBSUB_CHANNELS=channel1,channel2 \
-e PLUMBER_RELAY_TYPE=redis-pubsub \
-e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
batchcorp/plumber
docker run -d --name plumber-redis-streams -p 8080:8080 \
-e PLUMBER_RELAY_REDIS_STREAMS_ADDRESS=localhost:6379 \
-e PLUMBER_RELAY_REDIS_STREAMS_STREAMS=stream1,stream2 \
-e PLUMBER_RELAY_TYPE=redis-streams \
-e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
batchcorp/plumber
export PLUMBER_RELAY_TYPE="kafka"
export PLUMBER_RELAY_TOKEN="$YOUR-BATCHSH-TOKEN-HERE"
export PLUMBER_RELAY_KAFKA_ADDRESS="pkc-4kgmg.us-west-2.aws.confluent.cloud:9092"
export PLUMBER_RELAY_KAFKA_TOPIC="$YOUR_TOPIC"
export PLUMBER_RELAY_KAFKA_INSECURE_TLS="true"
export PLUMBER_RELAY_KAFKA_USERNAME="$YOUR_CONFLUENT_API_KEY"
export PLUMBER_RELAY_KAFKA_PASSWORD="$YOUR_CONFLUENT_API_SECRET"
export PLUMBER_RELAY_KAFKA_SASL_TYPE="plain"
$ plumber relay
$ plumber read rabbit --address="amqp://localhost" --exchange events --routing-key \# \
--line-numbers --protobuf-dir ~/schemas --protobuf-root-message Message --follow
1: {"some-attribute": 123, "numbers" : [1, 2, 3]}
2: {"some-attribute": 424, "numbers" : [325]}
3: {"some-attribute": 49, "numbers" : [958, 288, 289, 290]}
4: ERROR: Cannot decode message as protobuf "Message"
5: {"some-attribute": 394, "numbers" : [4, 5, 6, 7, 8]}
^C
NOTE: "jsonpb" is just a JSON representation of your protobuf event. When you
use it as the --input-type
, plumber
will read the JSON blob and attempt
to decode it into your specified root message, followed by writing the []byte
slice to the message bus.
$ plumber write rabbit --exchange events --routing-key foo.bar \
--line-numbers --protobuf-dir ~/schemas --protobuf-root-message Message \
--input-file ~/fakes/some-jsonpb-file.json --input-type jsonpb
$ plumber write kafka --topic=orders --avro-schema=some_schema.avsc --input-file=your_data.json
$ plumber read kafka --topic=orders --avro-schema=some_schema.avsc