Skip to content

Commit 165eace

Browse files
AMQP Stack polls for operations in a background thread. (Azure#4914)
There are two significant parts to this fix. The first is that the Connection object now has a method EnableAsyncOperation which registers the connection with a background thread which polls the connection. The EnableAsyncOperation needs to be callable from outside the connection because in normal client operation, the calls to open the connection are all internal to uAMQP. That means that the message sender and receiver need to call it when they're opened (and closed). The other major part of this change is that the AsyncOperationQueue now has a WaitForRequest API which does not normally poll (there is a test hook which enables polling but that is not normally used in most scenarios). The other part of this change fixes some concurrency issues associated with the Log::Stream functionality. Co-authored-by: Anton Kolesnyk <[email protected]> * Update sdk/core/azure-core/src/logger.cpp Co-authored-by: Anton Kolesnyk <[email protected]> * Update sdk/core/azure-core/test/ut/CMakeLists.txt Co-authored-by: Anton Kolesnyk <[email protected]> * Update sdk/core/azure-core/test/ut/logging_test.cpp Co-authored-by: Anton Kolesnyk <[email protected]> * Added ASAN as an option, converted transports to be bases to ensure consistancy --------- Co-authored-by: Anton Kolesnyk <[email protected]>
1 parent e0bda0b commit 165eace

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1247
-1507
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ Generated\ Files/
4747
[Tt]est[Rr]esult*/
4848
[Bb]uild[Ll]og.*
4949

50+
# CTest test results
51+
[Tt]esting/
52+
5053
# NUNIT
5154
*.VisualState.xml
5255
TestResult.xml

.vscode/cspell.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,13 @@
179179
"XSMB"
180180
],
181181
"overrides": [
182+
{
183+
"filename": "CMakePresets.json",
184+
"words": [
185+
"ASAN",
186+
"fsanitize"
187+
]
188+
},
182189
{
183190
"filename": "**/eng/pipelines/templates/**/*.yml",
184191
"words": [

CMakePresets.json

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,45 @@
166166
"BUILD_PERFORMANCE_TESTS": true
167167
}
168168
},
169+
{
170+
"name": "winhttp-transport",
171+
"displayName": "Enable WinHTTP Transport",
172+
"description": "Enable WinHTTP Transport (Hidden)",
173+
"hidden": true,
174+
"cacheVariables": {
175+
"BUILD_TRANSPORT_WINHTTP": true
176+
},
177+
"condition": {
178+
"type": "equals",
179+
"lhs": "${hostSystemName}",
180+
"rhs": "Windows"
181+
}
182+
},
183+
{
184+
"name": "curl-transport",
185+
"displayName": "Enable CURL Transport",
186+
"description": "Enable CURL Transport (Hidden)",
187+
"hidden": true,
188+
"cacheVariables": {
189+
"BUILD_TRANSPORT_CURL": true
190+
}
191+
},
192+
{
193+
"name": "enable-address-sanitizer",
194+
"displayName": "Enable Address Sanitizer",
195+
"description": "Enable Address Sanitizer (Hidden). Note: ASAN can be extremely disruptive, when enabling ASAN, make sure you run your application under the debugger.",
196+
"hidden": true,
197+
"environment": {
198+
"CFLAGS": "/fsanitize=address",
199+
"CXXFLAGS": "/fsanitize=address"
200+
},
201+
"cacheVariables": { "DISABLE_AZURE_CORE_OPENTELEMETRY": true },
202+
"condition": {
203+
"type": "equals",
204+
"lhs": "${hostSystemName}",
205+
"rhs": "Windows"
206+
}
207+
},
169208
{
170209
"name": "x86-static-debug-tests",
171210
"displayName": "x86 Debug With Tests, static",
@@ -204,19 +243,13 @@
204243
"name": "x64-static-debug-tests-curl",
205244
"displayName": "x64 Debug With Tests, static, libcurl",
206245
"description": "Windows x64 Debug build with Tests configured",
207-
"inherits": [ "x64-static", "debug-build", "enable-tests" ],
208-
"cacheVariables": {
209-
"BUILD_TRANSPORT_CURL": true
210-
}
246+
"inherits": [ "x64-static", "debug-build", "enable-tests", "curl-transport" ]
211247
},
212248
{
213249
"name": "x64-static-debug-tests-winhttp",
214250
"displayName": "x64 Debug With Tests, static, winhttp",
215251
"description": "Windows x64 Debug build with Tests configured",
216-
"inherits": [ "x64-static", "debug-build", "enable-tests" ],
217-
"cacheVariables": {
218-
"BUILD_TRANSPORT_WINHTTP": true
219-
}
252+
"inherits": [ "x64-static", "debug-build", "enable-tests", "winhttp-transport" ]
220253
},
221254
{
222255
"name": "x64-static-debug-tests-OpenSSL111",
@@ -234,30 +267,24 @@
234267
"inherits": [
235268
"x86-static",
236269
"release-build",
237-
"enable-tests"
238-
],
239-
"cacheVariables": {
240-
"BUILD_TRANSPORT_CURL": true
241-
}
270+
"enable-tests",
271+
"curl-transport"
272+
]
242273
},
243274
{
244275
"name": "x86-release-tests",
245276
"displayName": "x86 Release With Tests (Note: Does not link because it sets BUILD_TRANSPORT_CUSTOM)",
246-
"inherits": [ "x86", "release-build", "enable-tests" ],
277+
"inherits": [ "x86", "release-build", "enable-tests", "curl-transport", "winhttp-transport" ],
247278
"cacheVariables": {
248-
"BUILD_TRANSPORT_CURL": true,
249-
"BUILD_TRANSPORT_CUSTOM": true,
250-
"BUILD_TRANSPORT_WINHTTP": true
279+
"BUILD_TRANSPORT_CUSTOM": true
251280
}
252281
},
253282
{
254283
"name": "x64-release-tests",
255284
"displayName": "x64 Release With Tests (Note: Does not link because it sets BUILD_TRANSPORT_CUSTOM)",
256-
"inherits": [ "x64", "release-build", "enable-tests" ],
285+
"inherits": [ "x64", "release-build", "enable-tests", "curl-transport", "winhttp-transport" ],
257286
"cacheVariables": {
258-
"BUILD_TRANSPORT_CURL": true,
259-
"BUILD_TRANSPORT_CUSTOM": true,
260-
"BUILD_TRANSPORT_WINHTTP": true
287+
"BUILD_TRANSPORT_CUSTOM": true
261288
}
262289
},
263290
{
@@ -268,21 +295,15 @@
268295
"debug-build",
269296
"enable-tests",
270297
"enable-samples",
271-
"enable-perf"
272-
],
273-
"cacheVariables": {
274-
"BUILD_TRANSPORT_CURL": true,
275-
"BUILD_TRANSPORT_WINHTTP": true
276-
}
298+
"enable-perf",
299+
"winhttp-transport",
300+
"curl-transport"
301+
]
277302
},
278303
{
279304
"name": "x86-msvc-static-debug-perftests",
280305
"displayName": "x86 MSVC Debug static With Perf Tests and samples",
281-
"inherits": [ "x86-msvc-static", "debug-build", "enable-tests", "enable-perf", "enable-samples" ],
282-
"cacheVariables": {
283-
"BUILD_TRANSPORT_WINHTTP": true,
284-
"BUILD_TRANSPORT_CURL": true
285-
}
306+
"inherits": [ "x86-msvc-static", "debug-build", "enable-tests", "enable-perf", "enable-samples", "curl-transport", "winhttp-transport" ]
286307
},
287308
{
288309
"name": "x64-static-release-perftests",

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,24 @@
44

55
### Features Added
66

7+
- AMQP moved from a polling model to an asynchronous model.
8+
79
### Breaking Changes
810

11+
- Removed the `QueueSend` API from `MessageSender` because it was not compatible with the new asynchronous model.
12+
- The new asynchronous model requires the user to call `Close()` on the `MessageSender` and `MessageReceiver`
13+
to ensure operations have stabilized before destroying the object.
14+
- For connection listeners (primarily test scenarios), if you call `Open()` or `Listen()` on a connection, you MUST call `Close()`
15+
before the connection is destroyed.
16+
- The `Connection::Close()` method no longer requires that the caller provide connection disconnect information.
17+
- The `Session::End()` method no longer requires that the caller provide session disconnect information.
18+
- Several asserts have been added which will force termination of the running application if invariants have not been met.
19+
920
### Bugs Fixed
1021

22+
- Several fixes related to the new asynchronous model. Ensures that message senders and receivers are always closed,
23+
and that resources are released.
24+
1125
### Other Changes
1226

1327
## 1.0.0-beta.3 (2023-09-07)

sdk/core/azure-core-amqp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ add_library(Azure::azure-core-amqp ALIAS azure-core-amqp)
100100

101101
# coverage. Has no effect if BUILD_CODE_COVERAGE is OFF
102102
create_code_coverage(core azure-core-amqp azure-core-amqp-tests "tests?/*;samples?/*")
103+
create_code_coverage(core azure-core-amqp-uamqp azure-core-amqp-uamqp-tests "tests?/*;samples?/*")
103104

104105
# cspell:words aziotsharedutil
105106
# uAMQP's headers require the manual addition of umock_c, azure_macro_utils_c, and aziotsharedutil to the target link libraries.

sdk/core/azure-core-amqp/inc/azure/core/amqp/claims_based_security.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
6767
std::string const& audience,
6868
std::string const& token,
6969
Context const& context = {});
70-
void SetTrace(bool traceEnabled);
7170

7271
private:
7372
std::shared_ptr<ClaimsBasedSecurityImpl> m_impl;

sdk/core/azure-core-amqp/inc/azure/core/amqp/common/async_operation_queue.hpp

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#pragma once
55

66
#include <azure/core/context.hpp>
7+
#include <azure/core/diagnostics/logger.hpp>
8+
#include <azure/core/internal/diagnostics/log.hpp>
79

810
#include <condition_variable>
911
#include <list>
@@ -19,7 +21,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
1921
* "CompleteOperation" which sets the result, and a consumer calls WaitForResult which reads from
2022
* the AsyncOperationQueue. WaitForResult will block until a result is available.
2123
*/
22-
template <typename... T> class AsyncOperationQueue {
24+
template <typename... T> class AsyncOperationQueue final {
2325

2426
public:
2527
AsyncOperationQueue() = default;
@@ -63,6 +65,58 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
6365
} while (true);
6466
}
6567

68+
/**
69+
* @brief Wait for a result to be available.
70+
*
71+
* @param context The context to use for cancellation.
72+
* @param pollers optional set of pollers to call.
73+
* @return std::unique_ptr<std::tuple<T...>> The result.
74+
*
75+
* @remarks The pollers parameter is a TEST HOOK to allow test message receivers to interact
76+
* with the message loop. in general clients should NOT provide a poller.
77+
*
78+
*/
79+
template <class... Poller>
80+
std::unique_ptr<std::tuple<T...>> WaitForResult(Context const& context, Poller&... pollers)
81+
{
82+
// If the queue is not empty, return the first element.
83+
do
84+
{
85+
{
86+
std::unique_lock<std::mutex> lock(m_operationComplete);
87+
88+
if (!m_operationQueue.empty())
89+
{
90+
std::unique_ptr<std::tuple<T...>> rv;
91+
rv = std::move(m_operationQueue.front());
92+
m_operationQueue.pop_front();
93+
return rv;
94+
}
95+
96+
// There's nothing in the queue, wait until something is put into the queue.
97+
// This will block until either something is put into the queue or the context is
98+
// cancelled.
99+
m_operationCondition.wait_for(
100+
lock, std::chrono::milliseconds(100), [this, &context]() -> bool {
101+
// If the context is cancelled, we should return immediately.
102+
if (context.IsCancelled())
103+
{
104+
return true;
105+
}
106+
return !m_operationQueue.empty();
107+
});
108+
109+
if (context.IsCancelled())
110+
{
111+
return nullptr;
112+
}
113+
}
114+
// Note: We need to call Poll() *outside* the lock because the poller is going to call the
115+
// CompleteOperation function.
116+
Poll(pollers...);
117+
} while (true);
118+
}
119+
66120
// Clear any pending elements from the queue. This may be needed because some queued elements
67121
// may have ordering dependencies that need to be cleared before the object containing the queue
68122
// can be released.

sdk/core/azure-core-amqp/inc/azure/core/amqp/common/global_state.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33

44
#pragma once
55

6+
#include <list>
7+
#include <memory>
8+
#include <mutex>
9+
#include <thread>
10+
611
namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _detail {
712

813
/**
@@ -15,11 +20,35 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
1520
*
1621
*/
1722

23+
class Pollable {
24+
public:
25+
virtual void Poll() = 0;
26+
virtual ~Pollable() = default;
27+
};
1828
class GlobalStateHolder final {
1929
GlobalStateHolder();
2030
~GlobalStateHolder();
2131

32+
std::list<std::shared_ptr<Pollable>> m_pollables;
33+
std::mutex m_pollablesMutex;
34+
std::thread m_pollingThread;
35+
bool m_stopped{false};
36+
2237
public:
2338
static GlobalStateHolder* GlobalStateInstance();
39+
40+
GlobalStateHolder(GlobalStateHolder const&) = delete;
41+
GlobalStateHolder& operator=(GlobalStateHolder const&) = delete;
42+
43+
GlobalStateHolder(GlobalStateHolder&&) = delete;
44+
GlobalStateHolder& operator=(GlobalStateHolder&&) = delete;
45+
46+
void AddPollable(std::shared_ptr<Pollable> pollable);
47+
48+
void RemovePollable(std::shared_ptr<Pollable> pollable)
49+
{
50+
std::lock_guard<std::mutex> lock(m_pollablesMutex);
51+
m_pollables.remove(pollable);
52+
}
2453
};
2554
}}}}} // namespace Azure::Core::Amqp::Common::_detail

sdk/core/azure-core-amqp/inc/azure/core/amqp/connection.hpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
3838
class TestMessages_SenderOpenClose_Test;
3939
class TestMessages_TestLocalhostVsTls_Test;
4040
class TestMessages_SenderSendAsync_Test;
41+
class TestMessages_SenderOpenClose_Test;
42+
class TestMessages_ReceiverOpenClose_Test;
43+
class TestMessages_ReceiverReceiveAsync_Test;
44+
4145
}}}} // namespace Azure::Core::Amqp::Tests
4246
#endif // TESTING_BUILD
4347
#if defined(SAMPLES_BUILD)
@@ -153,6 +157,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
153157
Error,
154158
};
155159

160+
std::ostream& operator<<(std::ostream& stream, ConnectionState const value);
161+
156162
class Connection;
157163

158164
/** @brief The ConnectionEvents interface defines a series of events triggered on a connection
@@ -336,6 +342,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
336342
* will be opened implicitly by a Session object derived from the connection. It primarily
337343
* exists as a test hook.
338344
*
345+
* @remarks If you call Open() or Listen(), then you MUST call Close() when you are done with
346+
* the connection, BEFORE destroying it.
347+
*
339348
*/
340349
void Open();
341350

@@ -348,6 +357,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
348357
* will be opened implicitly by a Session object derived from the connection. It primarily
349358
* exists as a test hook.
350359
*
360+
* @remarks If you call Open() or Listen(), then you MUST call Close() when you are done with
361+
* the connection, BEFORE destroying it.
351362
*/
352363
void Listen();
353364

@@ -361,11 +372,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
361372
* will be closed implicitly by a Session object derived from the connection. It primarily
362373
* exists as a test hook.
363374
*
375+
* @remarks If you have NOT called Open() or Listen(), then calling this is an error.
376+
*
364377
*/
365378
void Close(
366-
std::string const& condition,
367-
std::string const& description,
368-
Models::AmqpValue info);
379+
std::string const& condition = {},
380+
std::string const& description = {},
381+
Models::AmqpValue info = {});
369382

370383
/** @brief Gets host configured by the connection.
371384
*
@@ -450,6 +463,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
450463
friend class Azure::Core::Amqp::Tests::TestMessages_SenderOpenClose_Test;
451464
friend class Azure::Core::Amqp::Tests::TestMessages_TestLocalhostVsTls_Test;
452465
friend class Azure::Core::Amqp::Tests::TestMessages_SenderSendAsync_Test;
466+
friend class Azure::Core::Amqp::Tests::TestMessages_SenderOpenClose_Test;
467+
453468
#endif // TESTING_BUILD
454469
#if SAMPLES_BUILD
455470
friend int LocalServerSample::LocalServerSampleMain();

sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
3232
Closing,
3333
Error,
3434
};
35+
std::ostream& operator<<(std::ostream& stream, _internal::MessageReceiverState const& state);
3536

3637
enum class ReceiverSettleMode
3738
{

0 commit comments

Comments
 (0)