Skip to content

Commit

Permalink
Clean-up partition list on storeoffsetMessage and storeOffsets
Browse files Browse the repository at this point in the history
  • Loading branch information
lazamar authored and AlexeyRaga committed Mar 10, 2024
1 parent e80c322 commit 17451f5
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Text as Text
import Foreign hiding (void)
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', topicPartitionFromMessageForCommit)
import Kafka.Consumer.Types (KafkaConsumer (..))
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), rdKafkaSeekPartitions, rdKafkaErrorDestroy, rdKafkaErrorCode, newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssign, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf, Callback(..))
Expand Down Expand Up @@ -185,15 +185,15 @@ storeOffsetMessage :: MonadIO m
-> ConsumerRecord k v
-> m (Maybe KafkaError)
storeOffsetMessage k m =
liftIO $ toNativeTopicPartitionListNoDispose [topicPartitionFromMessageForCommit m] >>= commitOffsetsStore k
liftIO $ toNativeTopicPartitionList [topicPartitionFromMessageForCommit m] >>= commitOffsetsStore k

-- | Stores offsets locally
storeOffsets :: MonadIO m
=> KafkaConsumer
-> [TopicPartition]
-> m (Maybe KafkaError)
storeOffsets k ps =
liftIO $ toNativeTopicPartitionListNoDispose ps >>= commitOffsetsStore k
liftIO $ toNativeTopicPartitionList ps >>= commitOffsetsStore k

-- | Commit offsets for all currently assigned partitions.
commitAllOffsets :: MonadIO m
Expand Down

0 comments on commit 17451f5

Please sign in to comment.