Skip to content

Commit

Permalink
Fixed producer.Flush() (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 15, 2019
1 parent 9e3736a commit d6a6187
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions pulsar/impl_partition_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {

pi := p.pendingQueue.PeekLast().(*pendingItem)
pi.sendRequests = append(pi.sendRequests, &sendRequest{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) {
fr.err = e
fr.waitGroup.Done()
Expand Down Expand Up @@ -337,9 +338,12 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
p.pendingQueue.Poll()
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
if sr.callback != nil {
if sr.msg != nil {
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
p.publishSemaphore.Release()
}

if sr.callback != nil {
msgID := newMessageId(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
Expand Down

0 comments on commit d6a6187

Please sign in to comment.