diff --git a/core/federated/federate.c b/core/federated/federate.c index c5a94ae4e..9c74ee2ba 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -641,11 +641,16 @@ static int handle_tagged_message(net_abstraction_t net, int fed_id) { " Discarding message and closing the network connection.", env->current_tag.time - start_time, env->current_tag.microstep, intended_tag.time - start_time, intended_tag.microstep); - // Free the allocated memory before returning - _lf_done_using(message_token); - // Close network abstraction, reading any incoming data and discarding it. - shutdown_net(_fed.net_for_inbound_p2p_connections[fed_id], false); - _fed.net_for_inbound_p2p_connections[fed_id] = NULL; + // The token was freshly allocated by _lf_new_token with ref_count 0 and was never + // scheduled, so _lf_done_using would incorrectly treat it as already freed. + // Use _lf_free_token directly, which handles ref_count == 0 correctly. + _lf_free_token(message_token); +#ifdef FEDERATED_DECENTRALIZED + _lf_decrement_tag_barrier_locked(env); +#endif + // Close the connection to unblock the listener, but do not free the memory; + // lf_terminate_execution will free it after joining the listener thread. + close_net(net, false); LF_MUTEX_UNLOCK(&env->mutex); return -1; } else { @@ -1508,8 +1513,8 @@ static void* listen_to_rti_net(void* args) { // Listen for messages from the federate. while (!_lf_termination_executed) { // Check whether the RTI network abstraction is still valid. - if (_fed.net_to_RTI == NULL) { - lf_print_warning("network abstraction to the RTI unexpectedly closed."); + if (_fed.net_to_RTI == NULL || !is_net_open(_fed.net_to_RTI)) { + lf_print_warning("network connection to the RTI unexpectedly closed."); return NULL; } // Read one byte to get the message type. @@ -1517,14 +1522,12 @@ static void* listen_to_rti_net(void* args) { int read_failed = read_from_net(_fed.net_to_RTI, 1, buffer); if (read_failed < 0) { lf_print_error("Connection to the RTI was closed by the RTI with an error. Considering this a soft error."); - shutdown_net(_fed.net_to_RTI, false); - _fed.net_to_RTI = NULL; + close_net(_fed.net_to_RTI, false); return NULL; } else if (read_failed > 0) { // EOF received. lf_print_info("Connection to the RTI closed with an EOF."); - shutdown_net(_fed.net_to_RTI, false); - _fed.net_to_RTI = NULL; + close_net(_fed.net_to_RTI, false); return NULL; } switch (buffer[0]) { @@ -1647,11 +1650,10 @@ void lf_terminate_execution(environment_t* env) { } LF_PRINT_DEBUG("Closing incoming P2P network abstractions."); - // Close any incoming P2P network abstractions that are still open. + // Close connections to unblock any listener threads that are blocking on reads, + // but do NOT free the memory yet because listener threads hold local pointers. for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - shutdown_net(_fed.net_for_inbound_p2p_connections[i], false); - // Ignore errors. Mark the network abstraction closed. - _fed.net_for_inbound_p2p_connections[i] = NULL; + close_net(_fed.net_for_inbound_p2p_connections[i], false); } // Check for all outgoing physical connections in @@ -1659,10 +1661,6 @@ void lf_terminate_execution(environment_t* env) { // if the network abstraction ID is not NULL, the connection is still open. // Send an EOF by closing the network abstraction here. for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - - // Close outbound connections, in case they have not closed themselves. - // This will result in EOF being sent to the remote federate, except for - // abnormal termination, in which case it will just close the network abstraction. close_outbound_net(i); } @@ -1681,6 +1679,14 @@ void lf_terminate_execution(environment_t* env) { // Wait for the thread listening for messages from the RTI to close. lf_thread_join(_fed.RTI_net_listener, NULL); + // All listener threads have now exited. Safe to free network abstraction memory. + for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { + free_net(_fed.net_for_inbound_p2p_connections[i]); + _fed.net_for_inbound_p2p_connections[i] = NULL; + } + free_net(_fed.net_to_RTI); + _fed.net_to_RTI = NULL; + // For abnormal termination, there is no need to free memory. if (_lf_normal_termination) { LF_PRINT_DEBUG("Freeing memory occupied by the federate."); diff --git a/network/api/net_abstraction.h b/network/api/net_abstraction.h index 884d6bb16..0317a6b3a 100644 --- a/network/api/net_abstraction.h +++ b/network/api/net_abstraction.h @@ -204,13 +204,44 @@ void write_to_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, uns bool is_net_open(net_abstraction_t net_abs); /** - * @brief Gracefully shut down and close a network abstraction. + * @brief Close the underlying connection of a network abstraction without freeing its memory. * @ingroup Network * + * This function closes the connection (making it unusable for I/O) but leaves the + * net_abstraction_t memory allocated. This is needed when another thread may still hold + * a pointer to the same network abstraction; the socket is closed to unblock any pending + * reads, but the memory remains valid. Call free_net() to release the memory after ensuring + * no other thread holds a reference. This function is idempotent. + * + * @param net_abs The network abstraction whose connection should be closed, or NULL (no-op). + * @param read_before_closing If true, read until EOF before closing. + * @return int Returns 0 on success, -1 on failure. + */ +int close_net(net_abstraction_t net_abs, bool read_before_closing); + +/** + * @brief Free the memory associated with a network abstraction. + * @ingroup Network + * + * The connection should already have been closed via close_net() or shutdown_net(). + * Safe to call with NULL (no-op). + * + * @param net_abs The network abstraction to free, or NULL. + */ +void free_net(net_abstraction_t net_abs); + +/** + * @brief Gracefully shut down, close, and free a network abstraction. + * @ingroup Network + * + * Equivalent to calling close_net() followed by free_net(). Do not use this function + * if another thread may still be using the same net_abstraction_t pointer; use close_net() + * to unblock the other thread first, join it, and then call free_net(). + * * If read_before_closing is false, call shutdown() with SHUT_RDWR and then close(). If true, call shutdown() with * SHUT_WR, then read() until EOF and discard received bytes before closing. * - * @param net_abs The network abstraction to shut down and close. + * @param net_abs The network abstraction to shut down and close, or NULL (no-op). * @param read_before_closing If true, read until EOF before closing the network abstraction. * @return int Returns 0 on success, -1 on failure (errno will indicate the error). */ diff --git a/network/impl/src/lf_socket_support.c b/network/impl/src/lf_socket_support.c index 76aee2308..ce22590b0 100644 --- a/network/impl/src/lf_socket_support.c +++ b/network/impl/src/lf_socket_support.c @@ -176,13 +176,16 @@ bool is_net_open(net_abstraction_t net_abs) { return is_socket_open(priv->socket_descriptor); } -int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) { +int close_net(net_abstraction_t net_abs, bool read_before_closing) { if (net_abs == NULL) { - LF_PRINT_LOG("Socket already closed."); return 0; } socket_priv_t* priv = (socket_priv_t*)net_abs; - int ret = shutdown_socket(&priv->socket_descriptor, read_before_closing); + return shutdown_socket(&priv->socket_descriptor, read_before_closing); +} + +int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) { + int ret = close_net(net_abs, read_before_closing); free_net(net_abs); return ret; } diff --git a/util/initialize_from_file.h b/util/initialize_from_file.h index 595902dee..73510d571 100644 --- a/util/initialize_from_file.h +++ b/util/initialize_from_file.h @@ -47,7 +47,7 @@ extern "C" { * * To use this to initialize parameters and/or state variables of a reactor, you can do the following: * - * ```lf-c + * ```lf * main reactor MyReactor(x: double = 0.0, row_number: int = 0) { * state y: double = 0.0; * reaction(startup) {= @@ -67,7 +67,7 @@ extern "C" { * If you wish to initialize parameters or state variables of a reactor within a bank, you * can create a CSV file with one row per bank member and use the `bank_index` parameter to * select the row to read. For example: - * ```lf-c + * ```lf * reactor MyReactor(bank_index: int = 0) { * reaction(startup) {= * lf_initialize_double("params.csv", ',', self->bank_index + 1, &self->x, &self->y, NULL); @@ -79,7 +79,7 @@ extern "C" { * This way, each bank member can have a different set of parameter values. * * To use this in a Lingua Franca program, you must include the following in the target declaration: - * ```lf-c + * ```lf * target C { * cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake", * files: [ @@ -89,7 +89,7 @@ extern "C" { * ``` * * Then, in the reactor, you can include the header file as follows: - * ```lf-c + * ```lf * reactor MyReactor { * preamble {= * #include "initialize_from_file.h" @@ -134,7 +134,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number * * To use this to initialize parameters and/or state variables of a reactor, you can do the following: * - * ```lf-c + * ```lf * main reactor MyReactor(x: int = 0, row_number: int = 0) { * state y: int = 0; * reaction(startup) {= @@ -154,7 +154,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number * If you wish to initialize parameters or state variables of a reactor within a bank, you * can create a CSV file with one row per bank member and use the `bank_index` parameter to * select the row to read. For example: - * ```lf-c + * ```lf * reactor MyReactor(bank_index: int = 0) { * reaction(startup) {= * lf_initialize_int("params.csv", ',', self->bank_index + 1, &self->x, &self->y, NULL); @@ -166,7 +166,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number * This way, each bank member can have a different set of parameter values. * * To use this in a Lingua Franca program, you must include the following in the target declaration: - * ```lf-c + * ```lf * target C { * cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake", * files: [ @@ -176,7 +176,7 @@ int lf_initialize_double(const char* filename, char delimiter, size_t row_number * ``` * * Then, in the reactor, you can include the header file as follows: - * ```lf-c + * ```lf * reactor MyReactor { * preamble {= * #include "initialize_from_file.h" @@ -206,7 +206,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, . * * This macro is meant to be called from a reaction, not directly. If you wish to call it from * somewhere other than a reaction, you can use the following function: - * ```lf-c + * ```lf * int _lf_initialize_string(const char* filename, char delimiter, size_t row_number, * struct allocation_record_t** allocations, ...); * ``` @@ -238,7 +238,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, . * * To use this to initialize parameters and/or state variables of a reactor, you can do the following: * - * ```lf-c + * ```lf * main reactor MyReactor(row_number: int = 0) { * state name: string = ""; * reaction(startup) {= @@ -258,7 +258,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, . * If you wish to initialize parameters or state variables of a reactor within a bank, you * can create a CSV file with one row per bank member and use the `bank_index` parameter to * select the row to read. For example: - * ```lf-c + * ```lf * reactor MyReactor(bank_index: int = 0) { * reaction(startup) {= * lf_initialize_string("params.csv", ',', self->bank_index + 1, &self->name, NULL); @@ -270,7 +270,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, . * This way, each bank member can have a different set of parameter values. * * To use this in a Lingua Franca program, you must include the following in the target declaration: - * ```lf-c + * ```lf * target C { * cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake", * files: [ @@ -280,7 +280,7 @@ int lf_initialize_int(const char* filename, char delimiter, size_t row_number, . * ``` * * Then, in the reactor, you can include the header file as follows: - * ```lf-c + * ```lf * reactor MyReactor { * preamble {= * #include "initialize_from_file.h"