Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,16 @@ static int handle_tagged_message(net_abstraction_t net, int fed_id) {
" Discarding message and closing the network connection.",
env->current_tag.time - start_time, env->current_tag.microstep, intended_tag.time - start_time,
intended_tag.microstep);
// Free the allocated memory before returning
_lf_done_using(message_token);
// Close network abstraction, reading any incoming data and discarding it.
shutdown_net(_fed.net_for_inbound_p2p_connections[fed_id], false);
_fed.net_for_inbound_p2p_connections[fed_id] = NULL;
// The token was freshly allocated by _lf_new_token with ref_count 0 and was never
// scheduled, so _lf_done_using would incorrectly treat it as already freed.
// Use _lf_free_token directly, which handles ref_count == 0 correctly.
_lf_free_token(message_token);
#ifdef FEDERATED_DECENTRALIZED
_lf_decrement_tag_barrier_locked(env);
#endif
// Close the connection to unblock the listener, but do not free the memory;
// lf_terminate_execution will free it after joining the listener thread.
close_net(net, false);
LF_MUTEX_UNLOCK(&env->mutex);
return -1;
} else {
Expand Down Expand Up @@ -1508,23 +1513,21 @@ static void* listen_to_rti_net(void* args) {
// Listen for messages from the federate.
while (!_lf_termination_executed) {
// Check whether the RTI network abstraction is still valid.
if (_fed.net_to_RTI == NULL) {
lf_print_warning("network abstraction to the RTI unexpectedly closed.");
if (_fed.net_to_RTI == NULL || !is_net_open(_fed.net_to_RTI)) {
lf_print_warning("network connection to the RTI unexpectedly closed.");
return NULL;
}
// Read one byte to get the message type.
// This will exit if the read fails.
int read_failed = read_from_net(_fed.net_to_RTI, 1, buffer);
if (read_failed < 0) {
lf_print_error("Connection to the RTI was closed by the RTI with an error. Considering this a soft error.");
shutdown_net(_fed.net_to_RTI, false);
_fed.net_to_RTI = NULL;
close_net(_fed.net_to_RTI, false);
return NULL;
} else if (read_failed > 0) {
// EOF received.
lf_print_info("Connection to the RTI closed with an EOF.");
shutdown_net(_fed.net_to_RTI, false);
_fed.net_to_RTI = NULL;
close_net(_fed.net_to_RTI, false);
return NULL;
}
switch (buffer[0]) {
Expand Down Expand Up @@ -1647,22 +1650,17 @@ void lf_terminate_execution(environment_t* env) {
}

LF_PRINT_DEBUG("Closing incoming P2P network abstractions.");
// Close any incoming P2P network abstractions that are still open.
// Close connections to unblock any listener threads that are blocking on reads,
// but do NOT free the memory yet because listener threads hold local pointers.
for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {
shutdown_net(_fed.net_for_inbound_p2p_connections[i], false);
// Ignore errors. Mark the network abstraction closed.
_fed.net_for_inbound_p2p_connections[i] = NULL;
close_net(_fed.net_for_inbound_p2p_connections[i], false);
}

// Check for all outgoing physical connections in
// _fed.net_for_outbound_p2p_connections and
// if the network abstraction ID is not NULL, the connection is still open.
// Send an EOF by closing the network abstraction here.
for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {

// Close outbound connections, in case they have not closed themselves.
// This will result in EOF being sent to the remote federate, except for
// abnormal termination, in which case it will just close the network abstraction.
close_outbound_net(i);
}

Expand All @@ -1681,6 +1679,14 @@ void lf_terminate_execution(environment_t* env) {
// Wait for the thread listening for messages from the RTI to close.
lf_thread_join(_fed.RTI_net_listener, NULL);

// All listener threads have now exited. Safe to free network abstraction memory.
for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {
free_net(_fed.net_for_inbound_p2p_connections[i]);
_fed.net_for_inbound_p2p_connections[i] = NULL;
}
free_net(_fed.net_to_RTI);
_fed.net_to_RTI = NULL;

// For abnormal termination, there is no need to free memory.
if (_lf_normal_termination) {
LF_PRINT_DEBUG("Freeing memory occupied by the federate.");
Expand Down
35 changes: 33 additions & 2 deletions network/api/net_abstraction.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,44 @@ void write_to_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, uns
bool is_net_open(net_abstraction_t net_abs);

/**
* @brief Gracefully shut down and close a network abstraction.
* @brief Close the underlying connection of a network abstraction without freeing its memory.
* @ingroup Network
*
* This function closes the connection (making it unusable for I/O) but leaves the
* net_abstraction_t memory allocated. This is needed when another thread may still hold
* a pointer to the same network abstraction; the socket is closed to unblock any pending
* reads, but the memory remains valid. Call free_net() to release the memory after ensuring
* no other thread holds a reference. This function is idempotent.
*
* @param net_abs The network abstraction whose connection should be closed, or NULL (no-op).
* @param read_before_closing If true, read until EOF before closing.
* @return int Returns 0 on success, -1 on failure.
*/
int close_net(net_abstraction_t net_abs, bool read_before_closing);

/**
* @brief Free the memory associated with a network abstraction.
* @ingroup Network
*
* The connection should already have been closed via close_net() or shutdown_net().
* Safe to call with NULL (no-op).
*
* @param net_abs The network abstraction to free, or NULL.
*/
void free_net(net_abstraction_t net_abs);

/**
* @brief Gracefully shut down, close, and free a network abstraction.
* @ingroup Network
*
* Equivalent to calling close_net() followed by free_net(). Do not use this function
* if another thread may still be using the same net_abstraction_t pointer; use close_net()
* to unblock the other thread first, join it, and then call free_net().
*
* If read_before_closing is false, call shutdown() with SHUT_RDWR and then close(). If true, call shutdown() with
* SHUT_WR, then read() until EOF and discard received bytes before closing.
*
* @param net_abs The network abstraction to shut down and close.
* @param net_abs The network abstraction to shut down and close, or NULL (no-op).
* @param read_before_closing If true, read until EOF before closing the network abstraction.
* @return int Returns 0 on success, -1 on failure (errno will indicate the error).
*/
Expand Down
9 changes: 6 additions & 3 deletions network/impl/src/lf_socket_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,16 @@ bool is_net_open(net_abstraction_t net_abs) {
return is_socket_open(priv->socket_descriptor);
}

int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) {
int close_net(net_abstraction_t net_abs, bool read_before_closing) {
if (net_abs == NULL) {
LF_PRINT_LOG("Socket already closed.");
return 0;
}
socket_priv_t* priv = (socket_priv_t*)net_abs;
int ret = shutdown_socket(&priv->socket_descriptor, read_before_closing);
return shutdown_socket(&priv->socket_descriptor, read_before_closing);
}

int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) {
int ret = close_net(net_abs, read_before_closing);
free_net(net_abs);
return ret;
}
26 changes: 13 additions & 13 deletions util/initialize_from_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ extern "C" {
*
* To use this to initialize parameters and/or state variables of a reactor, you can do the following:
*
* ```lf-c
* ```lf
* main reactor MyReactor(x: double = 0.0, row_number: int = 0) {
* state y: double = 0.0;
* reaction(startup) {=
Expand All @@ -67,7 +67,7 @@ extern "C" {
* If you wish to initialize parameters or state variables of a reactor within a bank, you
* can create a CSV file with one row per bank member and use the `bank_index` parameter to
* select the row to read. For example:
* ```lf-c
* ```lf
* reactor MyReactor(bank_index: int = 0) {
* reaction(startup) {=
* lf_initialize_double("params.csv", ',', self->bank_index + 1, &self->x, &self->y, NULL);
Expand All @@ -79,7 +79,7 @@ extern "C" {
* This way, each bank member can have a different set of parameter values.
*
* To use this in a Lingua Franca program, you must include the following in the target declaration:
* ```lf-c
* ```lf
* target C {
* cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake",
* files: [
Expand All @@ -89,7 +89,7 @@ extern "C" {
* ```
*
* Then, in the reactor, you can include the header file as follows:
* ```lf-c
* ```lf
* reactor MyReactor {
* preamble {=
* #include "initialize_from_file.h"
Expand Down Expand Up @@ -134,7 +134,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number
*
* To use this to initialize parameters and/or state variables of a reactor, you can do the following:
*
* ```lf-c
* ```lf
* main reactor MyReactor(x: int = 0, row_number: int = 0) {
* state y: int = 0;
* reaction(startup) {=
Expand All @@ -154,7 +154,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number
* If you wish to initialize parameters or state variables of a reactor within a bank, you
* can create a CSV file with one row per bank member and use the `bank_index` parameter to
* select the row to read. For example:
* ```lf-c
* ```lf
* reactor MyReactor(bank_index: int = 0) {
* reaction(startup) {=
* lf_initialize_int("params.csv", ',', self->bank_index + 1, &self->x, &self->y, NULL);
Expand All @@ -166,7 +166,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number
* This way, each bank member can have a different set of parameter values.
*
* To use this in a Lingua Franca program, you must include the following in the target declaration:
* ```lf-c
* ```lf
* target C {
* cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake",
* files: [
Expand All @@ -176,7 +176,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number
* ```
*
* Then, in the reactor, you can include the header file as follows:
* ```lf-c
* ```lf
* reactor MyReactor {
* preamble {=
* #include "initialize_from_file.h"
Expand Down Expand Up @@ -206,7 +206,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, .
*
* This macro is meant to be called from a reaction, not directly. If you wish to call it from
* somewhere other than a reaction, you can use the following function:
* ```lf-c
* ```lf
* int _lf_initialize_string(const char* filename, char delimiter, size_t row_number,
* struct allocation_record_t** allocations, ...);
* ```
Expand Down Expand Up @@ -238,7 +238,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, .
*
* To use this to initialize parameters and/or state variables of a reactor, you can do the following:
*
* ```lf-c
* ```lf
* main reactor MyReactor(row_number: int = 0) {
* state name: string = "";
* reaction(startup) {=
Expand All @@ -258,7 +258,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, .
* If you wish to initialize parameters or state variables of a reactor within a bank, you
* can create a CSV file with one row per bank member and use the `bank_index` parameter to
* select the row to read. For example:
* ```lf-c
* ```lf
* reactor MyReactor(bank_index: int = 0) {
* reaction(startup) {=
* lf_initialize_string("params.csv", ',', self->bank_index + 1, &self->name, NULL);
Expand All @@ -270,7 +270,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, .
* This way, each bank member can have a different set of parameter values.
*
* To use this in a Lingua Franca program, you must include the following in the target declaration:
* ```lf-c
* ```lf
* target C {
* cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake",
* files: [
Expand All @@ -280,7 +280,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, .
* ```
*
* Then, in the reactor, you can include the header file as follows:
* ```lf-c
* ```lf
* reactor MyReactor {
* preamble {=
* #include "initialize_from_file.h"
Expand Down
Loading