Replies: 2 comments 1 reply
-
|
Hey @Sidcapman, there might be something wrong with your setup. This worked for me: $ docker run --rm -it --name source -p8081:8081 -p9092:9092 redpandadata/redpanda start --node-id 0 --mode dev-container --set "rpk.additional_start_flags=[--reactor-backend=epoll]" --set redpanda.auto_create_topics_enabled=false --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr localhost:9092 --schema-registry-addr 0.0.0.0:8081
$ docker run --rm -it --name sink -p8082:8081 -p9093:9092 redpandadata/redpanda start --node-id 0 --mode dev-container --set "rpk.additional_start_flags=[--reactor-backend=epoll]" --set redpanda.auto_create_topics_enabled=false --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr localhost:9093 --schema-registry-addr 0.0.0.0:8081
$ rpk topic create foobar -p1 -X brokers=localhost:9092
$ rpk topic create foobar -p1 -X brokers=localhost:9093
$ for i in {1..10}; do echo -n "${i}_${i}" | rpk topic produce foobar -f '%d_%v' --allow-auto-topic-creation -X brokers=localhost:9092; done
$ rpk topic consume foobar -o ':end' -X brokers=localhost:9092
input:
kafka_franz:
seed_brokers: [ localhost:9092 ]
topics: [ foobar ]
consumer_group: "test"
processors:
- log:
fields_mapping: |
root.content = content().string()
root.kafka_timestamp_ms = @kafka_timestamp_ms
message: Transferring record
output:
kafka_franz:
seed_brokers: [ localhost:9093 ]
topic: foobar
timestamp_ms: ${! metadata("kafka_timestamp_ms") }$ redpanda-connect run config.yaml
INFO Running main config from specified file @service=redpanda-connect benthos_version="" path=./tmp/redpanda_input_output.yaml
INFO Successfully loaded Redpanda license @service=redpanda-connect expires_at="2035-05-04T18:41:47+01:00" license_org="" license_type="open source"
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=redpanda-connect
INFO Launching a Redpanda Connect instance, use CTRL+C to close @service=redpanda-connect
INFO Output type kafka_franz is now active @service=redpanda-connect label="" path=root.output
INFO Input type kafka_franz is now active @service=redpanda-connect label="" path=root.input
INFO Transferring record @service=redpanda-connect content=1 custom_source=true kafka_timestamp_ms=1 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=2 custom_source=true kafka_timestamp_ms=2 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=3 custom_source=true kafka_timestamp_ms=3 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=4 custom_source=true kafka_timestamp_ms=4 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=5 custom_source=true kafka_timestamp_ms=5 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=6 custom_source=true kafka_timestamp_ms=6 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=7 custom_source=true kafka_timestamp_ms=7 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=8 custom_source=true kafka_timestamp_ms=8 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=9 custom_source=true kafka_timestamp_ms=9 label="" path=root.input.processors.0
INFO Transferring record @service=redpanda-connect content=10 custom_source=true kafka_timestamp_ms=10 label="" path=root.input.processors.0
^CINFO Received SIGINT, the service is closing @service=redpanda-connect$ rpk topic consume foobar -o ':end' -X brokers=localhost:9093
{
"topic": "foobar",
"value": "1",
"timestamp": 1,
"partition": 0,
"offset": 0
}
{
"topic": "foobar",
"value": "2",
"timestamp": 2,
"partition": 0,
"offset": 1
}
{
"topic": "foobar",
"value": "4",
"timestamp": 4,
"partition": 0,
"offset": 2
}
{
"topic": "foobar",
"value": "3",
"timestamp": 3,
"partition": 0,
"offset": 3
}
{
"topic": "foobar",
"value": "6",
"timestamp": 6,
"partition": 0,
"offset": 4
}
{
"topic": "foobar",
"value": "5",
"timestamp": 5,
"partition": 0,
"offset": 5
}
{
"topic": "foobar",
"value": "8",
"timestamp": 8,
"partition": 0,
"offset": 6
}
{
"topic": "foobar",
"value": "7",
"timestamp": 7,
"partition": 0,
"offset": 7
}
{
"topic": "foobar",
"value": "9",
"timestamp": 9,
"partition": 0,
"offset": 8
}
{
"topic": "foobar",
"value": "10",
"timestamp": 10,
"partition": 0,
"offset": 9
}Please check again and let me know if you can provide reproduction steps. |
Beta Was this translation helpful? Give feedback.
0 replies
-
|
Hi @mihaitodor , sorry for the late reply, above is the output for the config in which i'm reading the messages from one topic(A) and sending them to other topic(B). below is the output as im reading from the second topic(B) as you can see there is a difference in time i'm receiving the events |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
problem statement:
I have published a message to a kafka_topic at 12:00. Subsequently, I am reading that message from the first kafka_topic and publishing it to a second kafka_topic. Under normal circumstances, the message would be inserted into the second topic at 12:01. However, I would like to modify the timestamp of the message being inserted into the second topic. Can this be achieved using the time_stamp_ms property, considering that the second topic only receives messages through this process, with no other configurations or processors involved?
I want to set
timestamp_msat which a message is inserted into a kafka topic, i came across the field:https://docs.redpanda.com/redpanda-connect/components/outputs/kafka/#timestamp_ms
the above config is not working as expected(same time is not being maintained, i have checked this by reading from
secondTopicusing another config), can anyone suggest modifications on this?Beta Was this translation helpful? Give feedback.
All reactions