Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateCheckpointImpl(
throw std::runtime_error("missing offset number");
}

checkpoint.Offset = std::stol(temp);
checkpoint.Offset = temp;
}

void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateOwnership(
Expand Down Expand Up @@ -59,7 +59,7 @@ Azure::Messaging::EventHubs::BlobCheckpointStore::CreateCheckpointBlobMetadata(

if (checkpoint.Offset.HasValue())
{
metadata["offset"] = std::to_string(checkpoint.Offset.Value());
metadata["offset"] = checkpoint.Offset.Value();
}
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
101,
std::string("101"),
202,
});

Expand All @@ -86,14 +86,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(101, checkpoints[0].Offset.Value());
EXPECT_EQ("101", checkpoints[0].Offset.Value());

checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
102,
std::string("102"),
203,
});

Expand All @@ -105,7 +105,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(203, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(102, checkpoints[0].Offset.Value());
EXPECT_EQ("102", checkpoints[0].Offset.Value());
}

TEST_P(BlobCheckpointStoreTest, TestOwnerships)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
/// @brief The partition ID for the corresponding checkpoint.
std::string PartitionId{};
/// @brief The offset of the last successfully processed event.
Azure::Nullable<int64_t> Offset{};
Azure::Nullable<std::string> Offset{};
/// @brief The sequence number of the last successfully processed event.
Azure::Nullable<int64_t> SequenceNumber{};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
* The offset is a marker or identifier for an event within the Event Hubs stream.
* The identifier is unique within a partition of the Event Hubs stream.
*/
Azure::Nullable<std::uint64_t> Offset;
Azure::Nullable<std::string> Offset;

/** @brief The partition key for sending a message to a partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
/** The sequence number of the last observed event to be enqueued in the partition. */
int64_t LastEnqueuedSequenceNumber{};
/** The offset of the last observed event to be enqueued in the partition */
int64_t LastEnqueuedOffset{};
std::string LastEnqueuedOffset{};

/** The date and time, in UTC, that the last observed event was enqueued in the partition. */
Azure::DateTime LastEnqueuedTimeUtc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
*@remark NOTE: offsets are not stable values, and might refer to different events over time
* as the Event Hub events reach their age limit and are discarded.
*/
Azure::Nullable<int64_t> Offset;
Azure::Nullable<std::string> Offset;

/**@brief SequenceNumber will start the consumer after the specified sequence number. Can be
* exclusive or inclusive, based on the Inclusive property.
Expand Down
16 changes: 2 additions & 14 deletions sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
auto dateTime = Azure::DateTime{Azure::DateTime::time_point{timePoint}};
EnqueuedTime = dateTime;
}
else if (key == _detail::OffsetNumberAnnotation)
else if (key == _detail::OffsetAnnotation)
{
switch (item.second.GetType())
{
case Azure::Core::Amqp::Models::AmqpValueType::Ulong:
Offset = item.second;
break;
case Azure::Core::Amqp::Models::AmqpValueType::Long:
Offset = static_cast<int64_t>(item.second);
break;
case Azure::Core::Amqp::Models::AmqpValueType::Uint:
Offset = static_cast<uint32_t>(item.second);
break;
case Azure::Core::Amqp::Models::AmqpValueType::Int:
Offset = static_cast<int32_t>(item.second);
break;
case Azure::Core::Amqp::Models::AmqpValueType::String:
Offset = std::strtoul(static_cast<std::string>(item.second).c_str(), nullptr, 10);
Offset = static_cast<std::string>(item.second);
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset " + greaterThan + "'"
+ std::to_string(startPosition.Offset.Value()) + "'";
+ startPosition.Offset.Value() + "'";
}
if (startPosition.SequenceNumber.HasValue())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail

constexpr const char* PartitionKeyAnnotation = "x-opt-partition-key";
constexpr const char* SequenceNumberAnnotation = "x-opt-sequence-number";
constexpr const char* OffsetNumberAnnotation = "x-opt-offset";
constexpr const char* OffsetAnnotation = "x-opt-offset";
constexpr const char* EnqueuedTimeAnnotation = "x-opt-enqueued-time";

constexpr const char* EventHubsServiceScheme = "amqps://";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
properties.BeginningSequenceNumber = bodyMap["begin_sequence_number"];
properties.LastEnqueuedSequenceNumber = bodyMap["last_enqueued_sequence_number"];
// For <reasons> the last enqueued offset is returned as a string. Convert to an int64.
properties.LastEnqueuedOffset = std::strtoull(
static_cast<std::string>(bodyMap["last_enqueued_offset"]).c_str(), nullptr, 10);
properties.LastEnqueuedOffset = static_cast<std::string>(bodyMap["last_enqueued_offset"]);

properties.LastEnqueuedTimeUtc = Azure::DateTime(std::chrono::system_clock::from_time_t(
std::chrono::duration_cast<std::chrono::seconds>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
{
Azure::Nullable<int64_t> sequenceNumber;

Azure::Nullable<int64_t> offsetNumber;
Azure::Nullable<std::string> offset;

for (auto const& pair : amqpMessage.MessageAnnotations)
{
Expand All @@ -43,13 +43,10 @@ namespace Azure { namespace Messaging { namespace EventHubs {
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Ulong)
sequenceNumber = static_cast<int64_t>(pair.second);
}
if (pair.first == _detail::OffsetNumberAnnotation)
if (pair.first == _detail::OffsetAnnotation)
{
if (pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Int
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Uint
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Long
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Ulong)
offsetNumber = static_cast<int64_t>(pair.second);
if (pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::String)
offset = static_cast<std::string>(pair.second);
}
}

Expand All @@ -58,8 +55,8 @@ namespace Azure { namespace Messaging { namespace EventHubs {
m_consumerClientDetails.EventHubName,
m_consumerClientDetails.FullyQualifiedNamespace,
m_partitionId,
sequenceNumber,
offsetNumber};
offset,
sequenceNumber};

m_checkpointStore->UpdateCheckpoint(checkpoint, context);
}
Expand All @@ -77,7 +74,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
{
sequenceNumber = eventData->SequenceNumber.Value();
}
uint64_t offset{};
std::string offset{};
if (!eventData->Offset.HasValue())
{
offset = eventData->Offset.Value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
101,
std::string("101"),
202,
});

Expand All @@ -68,14 +68,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(101, checkpoints[0].Offset.Value());
EXPECT_EQ("101", checkpoints[0].Offset.Value());

checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
102,
std::string("102"),
203,
});

Expand All @@ -87,7 +87,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(203, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(102, checkpoints[0].Offset.Value());
EXPECT_EQ("102", checkpoints[0].Offset.Value());
}

TEST_F(CheckpointStoreTest, TestOwnerships)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,11 @@ TEST_F(EventDataTest, ReceivedEventData)
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
Azure::Messaging::EventHubs::_detail::OffsetAnnotation}
.AsAmqpValue()]
= 54644;
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 54644);
ASSERT_FALSE(receivedEventData.Offset); // Offset must be a string value, not a numeric value.
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
Expand All @@ -218,12 +217,12 @@ TEST_F(EventDataTest, ReceivedEventData)
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
Azure::Messaging::EventHubs::_detail::OffsetAnnotation}
.AsAmqpValue()]
= "54644";
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 54644);
EXPECT_EQ(receivedEventData.Offset.Value(), "54644");
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
Expand All @@ -232,54 +231,12 @@ TEST_F(EventDataTest, ReceivedEventData)
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
Azure::Messaging::EventHubs::_detail::OffsetAnnotation}
.AsAmqpValue()]
= static_cast<uint32_t>(53);
= "53";
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 53);
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
}
{
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<int32_t>(57);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 57);
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
}
{
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<uint64_t>(661011);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 661011);
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
}
{
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<int64_t>(1412612);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 1412612);
EXPECT_EQ(receivedEventData.Offset.Value(), "53");
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,19 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
Azure::Core::Context context{Azure::DateTime::clock::now() + std::chrono::milliseconds(50)};
EXPECT_ANY_THROW(processor.NextPartitionClient(context));

{
auto partitionClientIterator = partitionClients.begin();
auto partitionClient = partitionClientIterator->second;
GTEST_LOG_(INFO) << "Erase client for partition " << partitionClientIterator->first;
partitionClients.erase(partitionClientIterator);
partitionClient->Close();

GTEST_LOG_(INFO) << "Get next partition client after releasing one.";
auto partitionClientNew = processor.NextPartitionClient();

GTEST_LOG_(INFO) << "Found client for partition " << partitionClientNew->PartitionId();
}

while (!partitionClients.empty())
{
auto partitionClientIterator = partitionClients.begin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
// Round trip a message with a binary body using an offset filter.
TEST_P(RoundTripTests, SendAndReceiveBinaryDataOffset_LIVEONLY_)
{
int64_t startOffset = 0;
std::string startOffset = "0";
{
auto producer{CreateProducerClient()};
auto partitionProperties = producer->GetPartitionProperties("1");
Expand Down