Skip to content

Commit 62b7988

Browse files
author
Nijat K
committed
Clean-up, fix configuring threads
Signed-off-by: Nijat K <[email protected]>
1 parent 379f5f5 commit 62b7988

File tree

6 files changed

+8
-177
lines changed

6 files changed

+8
-177
lines changed

cpp/csp/adapters/websocket/ClientAdapterManager.cpp

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,7 @@ namespace csp::adapters::websocket {
1818

1919
ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & properties )
2020
: AdapterManager( engine ),
21-
// m_ioc(),
22-
// m_active( false ),
23-
// m_shouldRun( false ),
24-
// m_endpoint(!properties.get<bool>("dynamic") ?
25-
// std::make_unique<WebsocketEndpoint>( m_ioc, properties ) :
26-
// nullptr),
27-
// // m_endpoint( std::make_unique<WebsocketEndpoint>( m_ioc, properties ) ),
28-
// m_inputAdapter( nullptr ),
29-
// m_outputAdapter( nullptr ),
30-
// m_updateAdapter( nullptr ),
31-
// m_thread( nullptr ),
32-
m_properties( properties )
33-
// m_work_guard(properties.get<bool>("dynamic") ?
34-
// std::make_optional(boost::asio::make_work_guard(m_ioc)) :
35-
// std::nullopt),
36-
// m_dynamic( properties.get<bool>("dynamic") )
21+
m_properties( properties )
3722
{ }
3823

3924
ClientAdapterManager::~ClientAdapterManager()

cpp/csp/adapters/websocket/ClientAdapterManager.h

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
#include <csp/adapters/websocket/WebsocketEndpointManager.h>
66
#include <csp/adapters/websocket/WebsocketClientTypes.h>
77
#include <csp/adapters/websocket/ClientInputAdapter.h>
8-
#include <csp/adapters/websocket/ClientOutputAdapter.h>
98
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
10-
#include <csp/adapters/websocket/ClientConnectionRequestAdapter.h>
119
#include <csp/core/Enum.h>
1210
#include <csp/core/Hash.h>
1311
#include <csp/engine/AdapterManager.h>
@@ -26,50 +24,8 @@ namespace csp::adapters::websocket {
2624

2725
using namespace csp;
2826

29-
class ClientConnectionRequestAdapter;
30-
class ClientOutputAdapter;
3127
class WebsocketEndpointManager;
3228

33-
34-
// struct WebsocketClientStatusTypeTraits
35-
// {
36-
// enum _enum : unsigned char
37-
// {
38-
// ACTIVE = 0,
39-
// GENERIC_ERROR = 1,
40-
// CONNECTION_FAILED = 2,
41-
// CLOSED = 3,
42-
// MESSAGE_SEND_FAIL = 4,
43-
44-
// NUM_TYPES
45-
// };
46-
47-
// protected:
48-
// _enum m_value;
49-
// };
50-
51-
// struct ConnectPayloads {
52-
// std::vector<std::string> consumer_payloads;
53-
// std::vector<std::string> producer_payloads;
54-
// };
55-
56-
// struct EndpointConfig {
57-
// std::chrono::milliseconds reconnect_interval;
58-
// std::unique_ptr<boost::asio::steady_timer> reconnect_timer;
59-
// bool attempting_reconnect{false};
60-
// bool shutting_down{false}; // Track intentional shutdown
61-
62-
// // Keep the payload storage
63-
// std::vector<std::string> consumer_payloads;
64-
// std::vector<std::string> producer_payloads;
65-
66-
// EndpointConfig(boost::asio::io_context& ioc)
67-
// : reconnect_timer(std::make_unique<boost::asio::steady_timer>(ioc))
68-
// {}
69-
// };
70-
71-
// using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;
72-
7329
class ClientAdapterManager final : public AdapterManager
7430
{
7531

@@ -96,57 +52,8 @@ class ClientAdapterManager final : public AdapterManager
9652
DateTime processNextSimTimeSlice( DateTime time ) override;
9753

9854
private:
99-
inline size_t validateCallerId(int64_t caller_id) const {
100-
if (caller_id < 0) {
101-
CSP_THROW(ValueError, "caller_id cannot be negative: " << caller_id);
102-
}
103-
return static_cast<size_t>(caller_id);
104-
}
105-
106-
// need some client info
107-
// This is a tuple of
108-
// (number of send calls to uri, set of caller id's subscribed to uri )
109-
// We use this information to keep track of how to route messages to/from
110-
// uri's, and when a uri connection can be closed.
111-
// using UriInfo = std::tuple<int32_t, std::unordered_set<uint64_t>>; //TODO remove
112-
// using OptWorkGuard = std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>;
11355
Dictionary m_properties;
11456
std::unique_ptr<WebsocketEndpointManager> m_endpointManager;
115-
// net::io_context m_ioc;
116-
// [[maybe_unused]] bool m_active;
117-
// [[maybe_unused]] bool m_shouldRun;
118-
// std::unique_ptr<WebsocketEndpoint> m_endpoint; // TODO remove
119-
// [[maybe_unused]] ClientInputAdapter* m_inputAdapter;
120-
// [[maybe_unused]] ClientOutputAdapter* m_outputAdapter;
121-
// ClientHeaderUpdateOutputAdapter* m_updateAdapter;
122-
// std::unique_ptr<std::thread> m_thread;
123-
// // For each subscribe call, which uri's it is subscribed to
124-
// std::vector<std::unordered_set<std::string>> m_subscribeFromUri;
125-
// // For each send call, which uri's it will send out to
126-
// std::vector<std::unordered_set<std::string>> m_sendToUri;
127-
128-
// // uri -> (send_calls, set of caller id's for the subscribtions)
129-
// // If send_calls is 0 (no adapter is sending out to that uri)
130-
// // AND the subscriptions set is empty, we can then shutdown the encpoint.
131-
// std::unordered_map<std::string, UriInfo> m_uriInfo; //TODO: remove
132-
// //unclear if this is needed to be on the object
133-
// std::vector<ClientConnectionRequestAdapter*> m_connectionRequestAdapters;
134-
135-
// // Bidirectional mapping using vectors since caller_ids are sequential
136-
// // Maybe not efficient? Should be good for small number of edges though
137-
// std::unordered_map<std::string, std::vector<bool>> m_endpoint_consumers; // endpoint_id -> vector[caller_id] for consuemrs
138-
// std::unordered_map<std::string, std::vector<bool>> m_endpoint_producers; // endpoint_id -> vector[caller_id] for producers
139-
140-
// // Quick lookup for caller's endpoints
141-
// std::vector< std::unordered_set<std::string> > m_consumer_endpoints; // caller_id -> set of endpoints they consume from
142-
// std::vector< std::unordered_set<std::string> > m_producer_endpoints; // caller_id -> set of endpoints they produce to
143-
// OptWorkGuard m_work_guard;
144-
// std::unordered_map<std::string, std::unique_ptr<WebsocketEndpoint>> m_endpoints;
145-
// std::unordered_map<std::string, ConnectPayloads> m_connect_payloads;
146-
// std::unordered_map<std::string, EndpointConfig> m_endpoint_configs;
147-
// std::vector<ClientInputAdapter*> m_inputAdapters;
148-
// std::vector<ClientOutputAdapter*> m_outputAdapters;
149-
// bool m_dynamic;
15057
};
15158

15259
}

cpp/csp/adapters/websocket/ClientHeaderUpdateAdapter.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,6 @@ void ClientHeaderUpdateOutputAdapter::executeImpl()
2424
auto endpoint = m_mgr -> getNonDynamicEndpoint();
2525
endpoint -> updateHeaders(std::move(headers));
2626
});
27-
28-
// Get the first e
29-
// ndpoint from the map and call updateHeaders
30-
// DictionaryPtr headers = m_properties.get<DictionaryPtr>("headers");
31-
// for( auto& update : input() -> lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>() )
32-
// {
33-
// if( update -> key_isSet() && update -> value_isSet() ) headers->update( update->key(), update->value() );
34-
// }
3527
};
3628

3729
}

cpp/csp/adapters/websocket/WebsocketEndpointManager.cpp

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ WebsocketEndpointManager::WebsocketEndpointManager( ClientAdapterManager* mgr, c
2020
m_updateAdapter( nullptr ),
2121
m_thread( nullptr ),
2222
m_properties( properties ),
23-
m_work_guard(std::make_optional(boost::asio::make_work_guard(m_ioc))),
24-
// m_work_guard(properties.get<bool>("dynamic") ?
25-
// std::make_optional(boost::asio::make_work_guard(m_ioc)) :
26-
// std::nullopt),
23+
m_work_guard(boost::asio::make_work_guard(m_ioc)),
2724
m_dynamic( properties.get<bool>("dynamic") ){
2825
// Total number of subscribe and send function calls, set on the adapter manager
2926
// when is it created. Note, that some of the input adapters might have been
@@ -50,29 +47,23 @@ WebsocketEndpointManager::WebsocketEndpointManager( ClientAdapterManager* mgr, c
5047
void WebsocketEndpointManager::start(DateTime starttime, DateTime endtime) {
5148
// maybe restart here?
5249
m_shouldRun = true;
53-
// std::vector<std::thread> threads;
54-
5550
m_ioc.reset();
5651
if( !m_dynamic ){
5752
boost::asio::post(m_strand, [this]() {
5853
// We subscribe for both the subscribe and send calls
5954
// But we probably should check here.
6055
if( m_outputAdapters.size() == 1)
6156
handleConnectionRequest(Dictionary(m_properties), 0, false);
62-
// // If we have an input adapter call AND it's not pruned.
57+
// If we have an input adapter call AND it's not pruned.
6358
if( m_inputAdapters.size() == 1 && !adapterPruned(0))
6459
handleConnectionRequest(Dictionary(m_properties), 0, true);
6560
});
6661
}
67-
for (auto i = 0; i < m_num_threads; ++i) {
62+
for (size_t i = 0; i < m_num_threads; ++i) {
6863
m_threads.emplace_back(std::make_unique<std::thread>([this]() {
6964
m_ioc.run();
7065
}));
7166
}
72-
// m_thread = std::make_unique<std::thread>([this]() {
73-
// // m_ioc.reset();
74-
// m_ioc.run();
75-
// });
7667
};
7768

7869
bool WebsocketEndpointManager::adapterPruned( size_t caller_id ){
@@ -145,12 +136,8 @@ void WebsocketEndpointManager::setupEndpoint(const std::string& endpoint_id,
145136
size_t validated_id)
146137
{
147138
// Store the endpoint first
148-
// std::cout << "INSIDE SETUPENDPOINT \n";
149-
// m_endpoints[endpoint_id] = std::move(endpoint);
150-
151139
auto& stored_endpoint = m_endpoints[endpoint_id] = std::move(endpoint);
152140

153-
// Do this directly to not get any issues with race conditions or something. Make this atomic
154141
stored_endpoint->setOnOpen([this, endpoint_id, endpoint = stored_endpoint.get(), payload=std::move(payload), persist, is_consumer, validated_id]() {
155142
auto [iter, inserted] = m_endpoint_configs.try_emplace(endpoint_id, m_ioc);
156143
auto& config = iter->second;
@@ -422,8 +409,7 @@ void WebsocketEndpointManager::stop() {
422409
}
423410
});
424411
// Stop the work guard to allow the io_context to complete
425-
if (m_work_guard )
426-
m_work_guard.reset();
412+
m_work_guard.reset();
427413
// }
428414
// else{
429415
// if( m_active ) m_endpoint->stop( false );

cpp/csp/adapters/websocket/WebsocketEndpointManager.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ class WebsocketEndpointManager {
131131
}
132132
}
133133
using UriInfo = std::tuple<int32_t, std::unordered_set<uint64_t>>; //TODO remove
134-
using OptWorkGuard = std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>;
135134
std::unique_ptr<WebsocketEndpointManager> m_endpointManager;
136135
size_t m_num_threads;
137136
net::io_context m_ioc;
@@ -169,7 +168,7 @@ class WebsocketEndpointManager {
169168
// Quick lookup for caller's endpoints
170169
std::vector< std::unordered_set<std::string> > m_consumer_endpoints; // caller_id -> set of endpoints they consume from
171170
std::vector< std::unordered_set<std::string> > m_producer_endpoints; // caller_id -> set of endpoints they produce to
172-
OptWorkGuard m_work_guard;
171+
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_work_guard;
173172
std::unordered_map<std::string, std::unique_ptr<WebsocketEndpoint>> m_endpoints;
174173
std::unordered_map<std::string, ConnectPayloads> m_connect_payloads;
175174
std::unordered_map<std::string, EndpointConfig> m_endpoint_configs;

csp/adapters/websocket.py

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -463,26 +463,6 @@ def __init__(
463463
def _dynamic(self):
464464
return self._properties.get("dynamic", False)
465465

466-
# @staticmethod
467-
# def to_internal_connection_request(conn_req: ConnectionRequest) -> InternalConnectionRequest:
468-
# # TODO Use?
469-
# uri = conn_req.uri
470-
# reconnect_interval = conn_req.reconnect_interval
471-
# resp = urllib.parse.urlparse(uri)
472-
# if resp.hostname is None:
473-
# raise ValueError(f"Failed to parse host from URI: {uri}")
474-
475-
# assert reconnect_interval >= timedelta(seconds=1)
476-
# conn_req_dict = ConnectionRequest.to_dict()
477-
# update_dict = dict(
478-
# use_ssl=uri.startswith("wss"),
479-
# route=resp.path or "/", # resource shouldn't be empty string
480-
# host=resp.hostname,
481-
# port=_sanitize_port(uri, resp.port),
482-
# )
483-
# conn_req_dict.update(update_dict)
484-
# return InternalConnectionRequest(**conn_req_dict)
485-
486466
def _get_properties(self, conn_request: ConnectionRequest) -> dict:
487467
uri = conn_request.uri
488468
reconnect_interval = conn_request.reconnect_interval
@@ -554,22 +534,12 @@ def subscribe(
554534
caller_id = self._get_caller_id(is_subscribe=True)
555535
# Gives validation, more to start defining a common interface
556536
adapter_props = AdapterInfo(caller_id=caller_id, is_subscribe=True).model_dump()
557-
# if connection_request is None and self._dynamic:
558-
# raise ValueError(
559-
# "'connection_request' must not be None if this adapter is dynamic. Use 'csp.null_ts(ConnectionRequest)' if this was intentional"
560-
# )
561537
connection_request = csp.null_ts(List[ConnectionRequest]) if connection_request is None else connection_request
562-
# if connection_request is not None:
563-
# request_dict = self._enrich_with_caller_id(connection_request, caller_id, is_subscribe=True)
564538
request_dict = csp.apply(
565539
connection_request, lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs], list
566540
)
567-
# We just declare it here, so it gets included in the graph
568-
# since nothing is returned.
569-
# csp.print("req dict", request_dict)
541+
# Output adapter to handle connection requests
570542
_websocket_connection_request_adapter_def(self, request_dict, adapter_props)
571-
# If in dynamic mode, we wrap the message in a struct to include the url the data
572-
# is coming from
573543

574544
field_map = field_map or {}
575545
meta_field_map = meta_field_map or {}
@@ -584,6 +554,7 @@ def subscribe(
584554
properties["meta_field_map"] = meta_field_map
585555

586556
properties.update(adapter_props)
557+
# We wrap the message in a struct to note the url it comes from
587558
if self._dynamic:
588559
ts_type = self.get_wrapper_struct(ts_type=ts_type)
589560
return _websocket_input_adapter_def(self, ts_type, properties, push_mode=push_mode)
@@ -593,15 +564,6 @@ def send(self, x: ts["T"], connection_request: Optional[ts[List[ConnectionReques
593564
# Gives validation, more to start defining a common interface
594565
adapter_props = AdapterInfo(caller_id=caller_id, is_subscribe=False).model_dump()
595566
connection_request = csp.null_ts(List[ConnectionRequest]) if connection_request is None else connection_request
596-
# if connection_request is None and self._dynamic:
597-
# raise ValueError(
598-
# "'connection_request' must not be None if this adapter is dynamic. Use 'csp.null_ts(ConnectionRequest)' if this was intentional"
599-
# )
600-
601-
# TODO:
602-
# Consider allowing header updates go through here
603-
# In non-dynamic mode
604-
# if connection_request is not None:
605567
request_dict = csp.apply(
606568
connection_request, lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs], list
607569
)

0 commit comments

Comments
 (0)