Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide produceMessageSync operation #90

Closed
felixmulder opened this issue Apr 9, 2019 · 17 comments
Closed

Provide produceMessageSync operation #90

felixmulder opened this issue Apr 9, 2019 · 17 comments

Comments

@felixmulder
Copy link
Contributor

Currently, we have a portion of our application where we need to know if the message was successfully sent to the broker. hw-kafka-client currently provides this by allowing callbacks on deliveries.

Fortunately, there's quite an easy way to support this using the librdkafka bindings: https://github.com/edenhill/librdkafka/wiki/Sync-producer

If it'd be of use to others - we can contribute this.

@AlexeyRaga
Copy link
Member

It can be trickier than it seems in hw-kafka-client.
What that code does is sending a message and then waiting in rd_kafka_poll loop until the callback sets some magic global variable.

hw-kafka-client has a green thread for calling rd_kafka_poll, so if you also do it in some C code, you essentially have two loops.

You can model it much simpler if you call produceMessage and then flushProducer.
What flushProducer is doing is that it waits until the internal librdkafka's producer queue is empty, which means that all the delivery reports are received and the messages are acknowledged by the broker.

Typically you use flushProducer at some "checkpoints", for example before committing offsets (to make sure that all your work is safe), but nothing prevents you from calling it after producing each message...

Would this way be sufficient?

P.S. flushProducer is currently implemented pretty much as the loop in your example, but it can be instead using librdkafka's function (which is presumably doing the same thing): #69

@felixmulder
Copy link
Contributor Author

I see. The problem with that solution is that as a producer, I cannot pinpoint my data without doing some reconciling logic. I guess flushProducer would make sure that the message is sent and then logged properly, but it forces a larger portion of the application to synchronize on delivery reports...

Maybe I can restate it like this - as a client, I'd like to be able to produce a message and receive its delivery report as the returned value:

produceMessage ::
     MonadIO m
  => KafkaProducer
  -> ProducerRecord
  -> m (Either KafkaError DeliveryReport)

I think the pointer err isn't really global - it's simply registered with librdkafka. I'll have a look at what the potential implementation would be and re-sync with you.

@AlexeyRaga
Copy link
Member

Ah, I see, they update message's opaque...

I guess we can do that, currently opaque is not used at all (we pass some dummy value), so I guess we could just do it for all the messages.

in hw-kafka-client we use delivery report wrapper anyway, we can add whatever we want to that wrapper. So we can always update message's opaque, and the producer can always set it.

If we do that for all messages, then it can be transparent for all the existing code, and we can implement produceSync, too.

@felixmulder
Copy link
Contributor Author

🎉

@tscholak
Copy link

tscholak commented Jul 24, 2019

Hello @AlexeyRaga and @felixmulder, I have an issue that I believe is very much related to what you have been working on here. Please let me elaborate.
I’ve been using hw-kafka-client now for several weeks together with pipes. To that end, I wrote my own pipes-kafka library based on the conduit library published by the hw org.
Early on I realized that, in order to guarantee at-least-once semantics, I mustn’t auto-commit offsets for the input topic I’m listening to. Instead, I should only commit offsets once I can be sure that the message has been processed successfully. In my case, that includes that an outgoing message has been successfully published to another topic. So, in essence (using pipes):

runEffect $ kafkaSource >-> tee (map f >-> kafkaSink) >-> flushProducerAndCommit

f here is some pure function that transforms ConsumerRecords into ProducerRecords, and flushProducerAndCommit first flushes the KafkaProducer underlying the kafkaSink (an effectful pipes Consumer) and then commits the offset of the ConsumerRecord using the KafkaConsumer underlying the kafkaSource (an effectful pipes Producer).
What I found is that flushing after every message kills performance. I thought of two mitigations:

  1. do not flush and do not commit on every message, but only every x seconds.
  2. somehow use the DeliveryReport callback to commit the offsets.

I was able to prototype 1), but didn’t see how I could do 2) given that the DeliveryReport does only ever have the ProducerRecord (or an error) and that one can only register callbacks per KafkaProducer but not per each individual ProducerRecord. That’s too bad because conceptually I’d prefer 2).

Do you have any insight to share?
Thanks

@AlexeyRaga
Copy link
Member

AlexeyRaga commented Jul 24, 2019

Hi @tscholak!

I see the pain and I may have a way to deal with it.

How librdkafka works internally is that when you consume a message, it stores its offset in some internal structure in memory. When you call commit, librdkafka looks at this memory and that's how it knows what offsets are to be committed. The same happens with auto commit.

So the "problem" is that the offsets are "stored" at the time the message is consumed, not at the time when it is "handled".

But that can be switched off! When we set noAutoOffsetStore property while creating a consumer librdkafka doesn't try to automatically remember offsets for the consumed messages!

Now we can use storeOffsets or storeOffsetMessage functions to tell the consumer the offsets of the messages that we have actually handled at the times when we are sure that it is safe for them to be committed.

And with this we can keep auto commit on.

Does this approach help with your issue?

@tscholak
Copy link

tscholak commented Jul 24, 2019

Hi @AlexeyRaga,
Thanks for the prompt response. Your approach is useful, but not fully so. It solves a problem that I wasn’t even aware existed! 😆
What you are telling me is that what I was doing was wrong: I thought that the commitOffsetMessage function commits the Offset that comes with the supplied ConsumerRecord, but, as you are telling me now, this is not the case. Looking at the code of the function, I now see that it uses the ConsumerRecord merely to construct a TopicPartition value, presumably to be able to refer to the partition’s internal offset store. Hence, in my example above, calling commitOffsetMessage at the end of the chain will commit all offsets for messages consumed but not necessarily processed until then. That’s a big difference.
I will change my workflow and do what you suggested: deactivate the auto offset store and storing offsets manually using storeOffsetMessage instead of commitOffsetMessage at the end of the chain.

However, the main problem I was hinting at remains unsolved: How do I know that the message went out to the output topic successfully? Right now I have to flush (either for each message which is slow or in chunks or time intervals as described by option 1) above). Is there a way to call storeOffsetMessage with the right/corresponding ConsumerRecord (from which the outgoing ProducerRecord was derived by virtue of f) from within the DeliveryReport callback of the KafkaProducer? (I’d rather not embed the incoming ConsumerRecord in the outgoing ProducerRecord because that would pollute the output topic.)

Edit: Earlier @AlexeyRaga said,

in hw-kafka-client we use delivery report wrapper anyway, we can add whatever we want to that wrapper.

This would suggest that DeliverReport could possibly be redefined as

data DeliveryReport a
  = DeliverySuccess a Offset
  | DeliveryFailure a KafkaError
  | NoMessageError KafkaError

where the type variable a replaces the current ProducerRecord.

produceMessage would then need to be something like:

produceMessage
  :: forall a m
   . MonadIO m
  => HasProducerRecord a
  => KafkaProducer
  -> a
  -> m (Maybe KafkaError)

and

class HasProducerRecord a where
  toProducerRecord :: a -> ProducerRecord

instance HasProducerRecord ProducerRecord where
  toProducerRecord = identity

@AlexeyRaga
Copy link
Member

@tscholak

I thought that the commitOffsetMessage function commits the Offset that comes with the supplied ConsumerRecord, but, as you are telling me now, this is not the case.

No, I apologise for not explaining it clear.
You are right, commitOffsetMessage will commit offsets for a given message. commitAllOffsets, however, will commit stored offsets.
I am saying that if instead of calling commitOffsetMessage you call storeOffsetMessage you can have auto commit on.

How do I know that the message went out to the output topic successfully
You can have delivery reports. From them you can get offsets for the committed messages and then you can call storeOffsets for these offsets so auto commit would the do the job.

Or you can flush the producer periodically.
We do it sometimes, using onEveryN or everyNSeconds although I don't like it that much.

@tscholak
Copy link

Thanks again @AlexeyRaga for clarification!

You can have delivery reports. From them you can get offsets for the committed messages and then you can call storeOffsets for these offsets so auto commit would the do the job.

I’m not sure I understand how this is possible. How can the DeliveryReport of a message outgoing to topic, say, out contain the offset of an incoming message from a different topic, in?

I think, at present, it would require to manually enrich the outgoing ProducerRecords with that information.

@AlexeyRaga
Copy link
Member

@tscholak I see how it is not convenient, but we could correlate by ProducerRecord or by its key/payload.

For example:

  • we get ConsumerRecord
  • we make a ProducerRecord out of it
  • we put the consumer offset and the producer payload to some Map in a state
  • when we get DeliveryReport we get the ProducerRecord payload from it and using that get its corresponding consumer's offsets(s) from the Map in the state
  • we call storeOffsets with these consumer offsets
  • we remove these things from the state.

Granted, it isn't particularly nice :(
I haven't actually tried it myself, since we typically use that everyNSeconds thing, but it seems doable...

We could think of doing better with allowing ProducerRecord to carry some extra information with it, and librdkafka already has an opaque field for this purpose.

Here is my braindump:

We could store some provenance representing "source messages offsets" in that opaque field:

data ProducerRecord = ProducerRecord
  { prTopic     :: !TopicName
  , prPartition :: !ProducePartition
  , prKey       :: Maybe ByteString
  , prValue     :: Maybe ByteString
  , provenance  :: [(TopicName, PartitionId, Offset)]
  } deriving (Eq, Show, Typeable, Generic)

Therefore we could immediately know "source" messages offsets in a delivery report.

Or we could go completely generic and have something similar to:

setOpaque :: Data.Binary.Put a => a -> ProducerRecord -> ProducerRecord

But now, when I get a delivery report, how do I know which type I should decode that opaque into? Make it a type parameter in ProducerRecord? And then parametrise the producer itself? Or somehow else?

I don't know what the solution would be, in general :(

Maybe the solution with the list of "source offsets" is good enough?

@felixmulder
Copy link
Contributor Author

Hey folks - I created this a while back to solve our needs: http://hackage.haskell.org/package/kafka-client-sync-0.1.1.0/docs/Kafka-Producer-Sync.html

Could probably be solved in a better way using opaque, but we're pretty happy with it and have been using it flawlessly in prod for about a month.

@sir4ur0n
Copy link
Contributor

Hi, we just had the exact same requirement of sync producing messages, and found this issue.
Luckily I read through it all to find the last message provides a library to solve this very issue 👍

What do you guys think about integrating/merging this lib back into hw-kafka-client? This looks like a common need, and a useful one on top of it.

Thank you @AlexeyRaga @felixmulder !

@felixmulder
Copy link
Contributor Author

@sir4ur0n - it'll be a lot easier to do this once we've merged something that gives us a "per produced message callback". This PR does that: #121

Hoping Alexey will have a look at it sometime soon, but I haven't poked him too hard yet :)

@AlexeyRaga
Copy link
Member

I am sorry, I will find time to look at PRs for the repos that I maintain sometimes soon.
Very busy currently with looking for a new job, finishing the current one, etc. :)

@felixmulder
Copy link
Contributor Author

No worries Alexey, good luck on the job search!

@felixmulder
Copy link
Contributor Author

I think we can close this now that the individual callback function has been set. There's an example in the tests now for how to do this, potentially we could add some docs showing this pattern as well:

https://github.com/haskell-works/hw-kafka-client/blob/master/tests-it/Kafka/IntegrationSpec.hs#L118-L134

@felixmulder
Copy link
Contributor Author

Done in #124

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants