diff --git a/sc-memory/sc-core/include/sc-core/sc_types.h b/sc-memory/sc-core/include/sc-core/sc_types.h index d820bb7ee..e837bceaa 100644 --- a/sc-memory/sc-core/include/sc-core/sc_types.h +++ b/sc-memory/sc-core/include/sc-core/sc_types.h @@ -231,7 +231,7 @@ typedef sc_uint16 sc_type; typedef sc_uint16 sc_states; # define SC_STATE_REQUEST_ERASURE 0x1 -# define SC_STATE_IS_ERASABLE 0x200 +# define SC_STATE_IS_UNDER_ERASURE 0x200 # define SC_STATE_ELEMENT_EXIST 0x2 // results diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h index 2f86ba349..f2c0e260e 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h @@ -50,7 +50,7 @@ sc_result sc_event_notify_element_deleted(sc_addr addr); * @param subscription_addr sc-addr of element that emitting event * @param event_type_addr Emitting event type * @param connector_addr A sc-address of added/removed sc-connector (just for specified events) - * @param edge_type A sc-type of added/removed sc-connector (just for specified events) + * @param connector_type A sc-type of added/removed sc-connector (just for specified events) * @param other_addr A sc-address of the second sc-element of sc-connector. If \p subscription_addr is a source, then \p * other_addr is a target. If \p subscription_addr is a target, then \p other_addr is a source. * @param callback A pointer function that is executed after the execution of a function that was called on the @@ -63,7 +63,7 @@ sc_result sc_event_emit( sc_addr subscription_addr, sc_event_type event_type_addr, sc_addr connector_addr, - sc_type edge_type, + sc_type connector_type, sc_addr other_addr, sc_event_do_after_callback callback, sc_addr event_addr); diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c index 9972ac462..04c4fa835 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c @@ -6,6 +6,8 @@ #include "sc_event_queue.h" +#include + #include "sc-core/sc_event_subscription.h" #include "sc_event_private.h" @@ -13,6 +15,7 @@ #include "sc-store/sc_storage_private.h" #include "sc_memory_private.h" #include "sc-core/sc_memory.h" +#include "sc-core/sc_keynodes.h" #include "sc-core/sc-base/sc_allocator.h" @@ -64,22 +67,22 @@ void _sc_event_emission_pool_worker_data_destroy(sc_event * data) void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) { sc_event * event = (sc_event *)data; - sc_event_emission_manager * queue = user_data; + sc_event_emission_manager * manager = user_data; sc_event_subscription * event_subscription = event->event_subscription; if (event_subscription == null_ptr) goto destroy; - sc_monitor_acquire_read(&queue->destroy_monitor); + sc_monitor_acquire_read(&manager->destroy_monitor); - if (queue->running == SC_FALSE) + if (manager->running == SC_FALSE) goto end; sc_monitor_acquire_read(&event_subscription->monitor); if (sc_event_subscription_is_deletable(event_subscription)) { sc_monitor_release_read(&event_subscription->monitor); - goto end; + goto destroy; } sc_event_callback callback = event_subscription->callback; @@ -95,12 +98,40 @@ void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) sc_storage_end_new_process(); + if (SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_connector_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_incoming_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_outgoing_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) + { + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); + sc_addr key = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr) + ? event->event_addr + : event->connector_addr; + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); + if (count != 0) + { + --count; + if (count == 0) + sc_hash_table_remove(manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); + else + sc_hash_table_insert( + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)), GUINT_TO_POINTER(count)); + } + sc_monitor_release_write(&manager->emitted_erase_events_monitor); + } + sc_monitor_release_read(&event_subscription->monitor); end: - sc_monitor_release_read(&queue->destroy_monitor); + sc_monitor_release_read(&manager->destroy_monitor); destroy: { + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); + --manager->current_emitted_events_count; + sc_monitor_release_write(&manager->emitted_erase_events_monitor); + if (event->callback != null_ptr) { sc_memory_context * ctx = sc_memory_context_new_ext(event->user_addr); @@ -112,6 +143,16 @@ void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) } } +guint emitted_events_hash_func(gconstpointer pointer) +{ + return GPOINTER_TO_UINT(pointer); +} + +gboolean emitted_events_equal_func(gconstpointer a, gconstpointer b) +{ + return (a == b); +} + void sc_event_emission_manager_initialize(sc_event_emission_manager ** manager, sc_memory_params const * params) { *manager = sc_mem_new(sc_event_emission_manager, 1); @@ -131,9 +172,12 @@ void sc_event_emission_manager_initialize(sc_event_emission_manager ** manager, } (*manager)->running = SC_TRUE; - sc_monitor_init(&(*manager)->destroy_monitor); sc_monitor_init(&(*manager)->pool_monitor); + (*manager)->current_emitted_events_count = 0; + (*manager)->emitted_erase_events = + sc_hash_table_init(emitted_events_hash_func, emitted_events_equal_func, null_ptr, null_ptr); + sc_monitor_init(&(*manager)->emitted_erase_events_monitor); (*manager)->thread_pool = g_thread_pool_new( _sc_event_emission_pool_worker, *manager, @@ -147,18 +191,8 @@ void sc_event_emission_manager_stop(sc_event_emission_manager * manager) if (manager == null_ptr) return; - sc_bool is_running = SC_FALSE; - - sc_monitor_acquire_read(&manager->destroy_monitor); - is_running = manager->running; - sc_monitor_release_read(&manager->destroy_monitor); - - if (is_running) - { - sc_monitor_acquire_write(&manager->destroy_monitor); + if (manager->running) manager->running = SC_FALSE; - sc_monitor_release_write(&manager->destroy_monitor); - } } void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) @@ -166,6 +200,8 @@ void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) if (manager == null_ptr) return; + // Acquire write lock once for all operations + sc_monitor_acquire_write(&manager->destroy_monitor); sc_monitor_acquire_write(&manager->pool_monitor); if (manager->thread_pool) { @@ -173,6 +209,17 @@ void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) manager->thread_pool = null_ptr; } + // Wait for current events to finish with a timeout + sc_int32 const MAX_WAIT_ITERATIONS = 1000; + sc_int32 wait_iterations = 0; + while (manager->current_emitted_events_count > 0 && wait_iterations < MAX_WAIT_ITERATIONS) + { + sc_monitor_release_write(&manager->destroy_monitor); + usleep(10); + sc_monitor_acquire_write(&manager->destroy_monitor); + wait_iterations++; + } + while (!sc_queue_empty(&manager->deletable_events_subscriptions)) { sc_event_subscription * event_subscription = sc_queue_pop(&manager->deletable_events_subscriptions); @@ -185,6 +232,7 @@ void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) sc_monitor_destroy(&manager->pool_monitor); sc_monitor_destroy(&manager->destroy_monitor); + sc_hash_table_destroy(manager->emitted_erase_events); sc_mem_free(manager); } @@ -198,13 +246,38 @@ void _sc_event_emission_manager_add( sc_event_do_after_callback callback, sc_addr event_addr) { - if (manager == null_ptr) - return; - sc_event * event = _sc_event_new(event_subscription, user_addr, connector_addr, connector_type, other_addr, callback, event_addr); sc_monitor_acquire_write(&manager->pool_monitor); + if (manager->thread_pool == null_ptr) + { + sc_monitor_release_write(&manager->pool_monitor); + return; + } + + if (SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_connector_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_incoming_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_outgoing_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) + { + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); + sc_addr key_addr = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr) + ? event_subscription->subscription_addr + : connector_addr; + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key_addr))); + ++count; + sc_hash_table_insert( + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key_addr)), GUINT_TO_POINTER(count)); + sc_monitor_release_write(&manager->emitted_erase_events_monitor); + } + + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); + ++manager->current_emitted_events_count; + sc_monitor_release_write(&manager->emitted_erase_events_monitor); + g_thread_pool_push(manager->thread_pool, event, null_ptr); sc_monitor_release_write(&manager->pool_monitor); } diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h index a7ed9a4d3..e7613e315 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h @@ -32,6 +32,11 @@ typedef struct sc_monitor destroy_monitor; ///< Monitor for synchronizing access to the destruction process. GThreadPool * thread_pool; ///< Thread pool used for worker threads processing events. sc_monitor pool_monitor; ///< Monitor for synchronizing access to the thread pool. + sc_uint32 current_emitted_events_count; ///< Current count of emitted events. + sc_hash_table * emitted_erase_events; ///< Table that stores amount of active event subscriptions that were initiated + ///< due to erasure of sc-element which sc-addr is stored as a key. + sc_monitor emitted_erase_events_monitor; ///< Monitor for synchronizing current_emitted_events_count and + ///< emitted_erase_events. } sc_event_emission_manager; /*! Function that initializes an sc-event emission manager. diff --git a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c index 75cbbfa92..acd748182 100644 --- a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c +++ b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c @@ -51,6 +51,7 @@ sc_result _sc_event_subscription_manager_add( sc_event_subscription_manager * manager, sc_event_subscription * event_subscription) { + sc_result result = SC_RESULT_OK; sc_hash_table_list * element_events_list = null_ptr; // the first, if table doesn't exist, then return error @@ -58,23 +59,22 @@ sc_result _sc_event_subscription_manager_add( return SC_RESULT_NO; sc_monitor_acquire_write(&manager->events_table_monitor); - if (manager->events_table == null_ptr) { - sc_monitor_release_write(&manager->events_table_monitor); - return SC_RESULT_NO; + result = SC_RESULT_OK; + goto result; } - // if there are no events for specified sc-element, then generate new events list + // if there are no events for specified sc-element, then create new events list element_events_list = (sc_hash_table_list *)sc_hash_table_get(manager->events_table, TABLE_KEY(event_subscription->subscription_addr)); element_events_list = sc_hash_table_list_append(element_events_list, (sc_pointer)event_subscription); sc_hash_table_insert( manager->events_table, TABLE_KEY(event_subscription->subscription_addr), (sc_pointer)element_events_list); +result: sc_monitor_release_write(&manager->events_table_monitor); - - return SC_RESULT_OK; + return result; } /*! Removes the specified sc-event_subscription from the registration manager's events table. @@ -86,6 +86,7 @@ sc_result _sc_event_subscription_manager_remove( sc_event_subscription_manager * manager, sc_event_subscription * event_subscription) { + sc_result result = SC_RESULT_OK; sc_hash_table_list * element_events_list = null_ptr; // the first, if table doesn't exist, then return error @@ -96,10 +97,10 @@ sc_result _sc_event_subscription_manager_remove( element_events_list = (sc_hash_table_list *)sc_hash_table_get(manager->events_table, TABLE_KEY(event_subscription->subscription_addr)); if (element_events_list == null_ptr) - goto error; - - if (manager->events_table == null_ptr) - goto error; + { + result = SC_RESULT_ERROR_INVALID_PARAMS; + goto result; + } // remove event_subscription from list of events for specified sc-element element_events_list = sc_hash_table_list_remove(element_events_list, (sc_const_pointer)event_subscription); @@ -109,11 +110,9 @@ sc_result _sc_event_subscription_manager_remove( sc_hash_table_insert( manager->events_table, TABLE_KEY(event_subscription->subscription_addr), (sc_pointer)element_events_list); +result: sc_monitor_release_write(&manager->events_table_monitor); - return SC_RESULT_OK; -error: - sc_monitor_release_write(&manager->events_table_monitor); - return SC_RESULT_ERROR_INVALID_PARAMS; + return result; } void sc_event_subscription_manager_initialize(sc_event_subscription_manager ** manager) @@ -205,7 +204,12 @@ sc_result sc_event_subscription_destroy(sc_event_subscription * event_subscripti return SC_RESULT_NO; sc_event_subscription_manager * subscription_manager = sc_storage_get_event_subscription_manager(); + if (subscription_manager == null_ptr) + return SC_RESULT_NO; + sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); + if (subscription_manager == null_ptr) + return SC_RESULT_NO; sc_monitor_acquire_write(&event_subscription->monitor); if (_sc_event_subscription_manager_remove(subscription_manager, event_subscription) != SC_RESULT_OK) @@ -250,16 +254,11 @@ sc_result sc_event_notify_element_deleted(sc_addr element) if (subscription_manager == null_ptr || subscription_manager->events_table == null_ptr) goto result; - // TODO(NikitaZotov): Implement monitor for `subscription_manager` to synchronize its freeing. // lookup for all registered to specified sc-element events sc_monitor_acquire_write(&subscription_manager->events_table_monitor); - if (subscription_manager != null_ptr) - { - element_events_list = - (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(element)); - if (element_events_list != null_ptr) - sc_hash_table_remove(subscription_manager->events_table, TABLE_KEY(element)); - } + element_events_list = (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(element)); + if (element_events_list != null_ptr) + sc_hash_table_remove(subscription_manager->events_table, TABLE_KEY(element)); if (element_events_list != null_ptr) { @@ -304,7 +303,7 @@ sc_result sc_event_emit( if (_sc_memory_context_are_events_pending(ctx)) { - _sc_memory_context_pend_event(ctx, event_type_addr, subscription_addr, connector_addr, connector_type, other_addr); + _sc_memory_context_pend_event(ctx, subscription_addr, event_type_addr, connector_addr, connector_type, other_addr); return SC_RESULT_OK; } @@ -333,12 +332,10 @@ sc_result sc_event_emit_impl( if (subscription_manager == null_ptr || subscription_manager->events_table == null_ptr) goto result; - // TODO(NikitaZotov): Implement monitor for `subscription_manager` to synchronize its freeing. // lookup for all registered to specified sc-element events sc_monitor_acquire_read(&subscription_manager->events_table_monitor); - if (subscription_manager != null_ptr) - element_events_list = - (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(subscription_addr)); + element_events_list = + (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(subscription_addr)); while (element_events_list != null_ptr) { diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 36c7594bd..5be785d96 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -669,198 +669,314 @@ sc_result _sc_storage_element_erase(sc_addr addr) return result; } -sc_result sc_storage_element_erase(sc_memory_context const * ctx, sc_addr addr) +void _sc_storage_cache_elements_under_erasure_without_erase_events( + sc_addr addr, + sc_hash_table * incident_elements_under_erasure, + sc_hash_table * processed_elements) { - sc_result result; + sc_pointer key = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr)); + if (sc_hash_table_get(incident_elements_under_erasure, key) != null_ptr) + return; + if (sc_hash_table_get(processed_elements, key) != null_ptr) + return; + + sc_monitor * monitor = sc_monitor_table_get_monitor_for_addr(&storage->addr_monitors_table, addr); + sc_monitor_acquire_read(monitor); + sc_element * element; + sc_result result = sc_storage_get_element_by_addr(addr, &element); + if (result == SC_RESULT_OK && (element->flags.states & SC_STATE_IS_UNDER_ERASURE) == SC_STATE_IS_UNDER_ERASURE + && (element->flags.states & SC_STATE_REQUEST_ERASURE) != SC_STATE_REQUEST_ERASURE) + { + sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); + if (emission_manager != null_ptr) + { + sc_monitor_acquire_read(&emission_manager->pool_monitor); + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))); + if (count == 0) + sc_hash_table_insert(incident_elements_under_erasure, key, element); + sc_monitor_release_read(&emission_manager->pool_monitor); + } + } + sc_monitor_release_read(monitor); +} +sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( + sc_memory_context const * ctx, + sc_addr connector_chain_begin_addr, + sc_addr addr, + sc_hash_table * processed_connectors, + sc_bool * does_branch_have_emitted_events, + sc_list * elements_that_can_be_erased, + sc_hash_table * incident_nodes_under_erasure) +{ + sc_result result; sc_element * el = null_ptr; + sc_monitor * monitor = sc_monitor_table_get_monitor_for_addr(&storage->addr_monitors_table, addr); + sc_monitor_acquire_write(monitor); result = sc_storage_get_element_by_addr(addr, &el); if (result != SC_RESULT_OK) - goto error; - - sc_hash_table * cache_table = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); - - sc_queue iter_queue; - sc_queue_init(&iter_queue); - sc_pointer p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr)); - sc_queue_push(&iter_queue, p_addr); + { + sc_monitor_release_write(monitor); + return result; + } + // if element wasn't erased before then erase events should be emitted, otherwise there should be a check for started + // and not finished erase events callbacks + sc_bool const was_element_erased_before = (el->flags.states & SC_STATE_IS_UNDER_ERASURE) == SC_STATE_IS_UNDER_ERASURE; + if (!was_element_erased_before) + el->flags.states |= SC_STATE_IS_UNDER_ERASURE; + sc_monitor_release_write(monitor); - sc_queue addrs_with_not_emitted_erase_events; - sc_queue_init(&addrs_with_not_emitted_erase_events); - while (!sc_queue_empty(&iter_queue)) + sc_monitor_acquire_read(monitor); + if ((el->flags.states & SC_STATE_REQUEST_ERASURE) == SC_STATE_REQUEST_ERASURE) { - p_addr = sc_queue_pop(&iter_queue); + sc_monitor_release_read(monitor); + return SC_RESULT_OK; + } + + sc_type const type = el->flags.type; + sc_addr const begin_addr = el->arc.begin; + sc_addr const end_addr = el->arc.end; - sc_addr element_addr; - element_addr.seg = SC_ADDR_LOCAL_SEG_FROM_INT((sc_pointer_to_sc_addr_hash)p_addr); - element_addr.offset = SC_ADDR_LOCAL_OFFSET_FROM_INT((sc_pointer_to_sc_addr_hash)p_addr); + sc_result erase_incoming_connector_result = SC_RESULT_NO; + sc_result erase_outgoing_connector_result = SC_RESULT_NO; + sc_result erase_incoming_arc_result = SC_RESULT_NO; + sc_result erase_outgoing_arc_result = SC_RESULT_NO; + sc_result erase_element_result = SC_RESULT_NO; + sc_bool there_are_active_erase_events_with_addr = SC_FALSE; - sc_monitor * monitor = sc_monitor_table_get_monitor_for_addr(&storage->addr_monitors_table, element_addr); - sc_monitor_acquire_read(monitor); - result = sc_storage_get_element_by_addr(element_addr, &el); - if (result != SC_RESULT_OK) + if (!was_element_erased_before) + { + if (sc_type_is_connector(type)) { - sc_monitor_release_read(monitor); - continue; + erase_incoming_connector_result = sc_event_emit( + ctx, + begin_addr, + sc_event_before_erase_connector_addr, + addr, + type, + end_addr, + sc_storage_element_erase, + connector_chain_begin_addr); + erase_outgoing_connector_result = sc_event_emit( + ctx, + end_addr, + sc_event_before_erase_connector_addr, + addr, + type, + begin_addr, + sc_storage_element_erase, + connector_chain_begin_addr); } - sc_type const type = el->flags.type; - sc_addr const begin_addr = el->arc.begin; - sc_addr const end_addr = el->arc.end; - - sc_result erase_incoming_connector_result = SC_RESULT_NO; - sc_result erase_outgoing_connector_result = SC_RESULT_NO; - sc_result erase_incoming_arc_result = SC_RESULT_NO; - sc_result erase_outgoing_arc_result = SC_RESULT_NO; - sc_result erase_element_result = SC_RESULT_NO; - - if ((el->flags.states & SC_STATE_IS_ERASABLE) != SC_STATE_IS_ERASABLE) + if (sc_type_has_subtype(type, sc_type_common_edge)) { - if ((type & sc_type_connector_mask) != 0) - { - erase_incoming_connector_result = sc_event_emit( - ctx, - begin_addr, - sc_event_before_erase_connector_addr, - element_addr, - type, - end_addr, - sc_storage_element_erase, - element_addr); - erase_outgoing_connector_result = sc_event_emit( - ctx, - end_addr, - sc_event_before_erase_connector_addr, - element_addr, - type, - begin_addr, - sc_storage_element_erase, - element_addr); - } - - if (sc_type_has_subtype(type, sc_type_common_edge)) - { - erase_incoming_arc_result = sc_event_emit( - ctx, - begin_addr, - sc_event_before_erase_edge_addr, - element_addr, - type, - end_addr, - sc_storage_element_erase, - element_addr); - erase_outgoing_arc_result = sc_event_emit( - ctx, - end_addr, - sc_event_before_erase_edge_addr, - element_addr, - type, - begin_addr, - sc_storage_element_erase, - element_addr); - } - else if (sc_type_has_subtype_in_mask(type, sc_type_arc_mask)) - { - erase_outgoing_arc_result = sc_event_emit( - ctx, - begin_addr, - sc_event_before_erase_outgoing_arc_addr, - element_addr, - type, - end_addr, - sc_storage_element_erase, - element_addr); - erase_incoming_arc_result = sc_event_emit( - ctx, - end_addr, - sc_event_before_erase_incoming_arc_addr, - element_addr, - type, - begin_addr, - sc_storage_element_erase, - element_addr); - } - - erase_element_result = sc_event_emit( + erase_incoming_arc_result = sc_event_emit( ctx, - element_addr, - sc_event_before_erase_element_addr, - SC_ADDR_EMPTY, - 0, - SC_ADDR_EMPTY, + begin_addr, + sc_event_before_erase_edge_addr, + addr, + type, + end_addr, sc_storage_element_erase, - element_addr); - - el->flags.states |= SC_STATE_IS_ERASABLE; + connector_chain_begin_addr); + erase_outgoing_arc_result = sc_event_emit( + ctx, + end_addr, + sc_event_before_erase_edge_addr, + addr, + type, + begin_addr, + sc_storage_element_erase, + connector_chain_begin_addr); } - - if (erase_incoming_connector_result == SC_RESULT_OK || erase_outgoing_connector_result == SC_RESULT_OK - || erase_incoming_arc_result == SC_RESULT_OK || erase_outgoing_arc_result == SC_RESULT_OK - || erase_element_result == SC_RESULT_OK) + else if (sc_type_has_subtype_in_mask(type, sc_type_arc_mask)) { - sc_monitor_release_read(monitor); - continue; + erase_outgoing_arc_result = sc_event_emit( + ctx, + begin_addr, + sc_event_before_erase_outgoing_arc_addr, + addr, + type, + end_addr, + sc_storage_element_erase, + connector_chain_begin_addr); + erase_incoming_arc_result = sc_event_emit( + ctx, + end_addr, + sc_event_before_erase_incoming_arc_addr, + addr, + type, + begin_addr, + sc_storage_element_erase, + connector_chain_begin_addr); } - sc_queue_push(&addrs_with_not_emitted_erase_events, p_addr); - - sc_addr connector_addr = el->first_out_arc; - while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) + erase_element_result = sc_event_emit( + ctx, + addr, + sc_event_before_erase_element_addr, + SC_ADDR_EMPTY, + 0, + SC_ADDR_EMPTY, + sc_storage_element_erase, + connector_chain_begin_addr); + } + else + { + sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); + if (emission_manager != null_ptr) { - p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); + sc_monitor_acquire_read(&emission_manager->pool_monitor); + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))); + if (count != 0) + there_are_active_erase_events_with_addr = SC_TRUE; + sc_monitor_release_read(&emission_manager->pool_monitor); + } + } - sc_element * connector = sc_hash_table_get(cache_table, p_addr); - if (connector == null_ptr) - { - result = sc_storage_get_element_by_addr(connector_addr, &connector); - if (result != SC_RESULT_OK) - break; + if (erase_incoming_connector_result == SC_RESULT_OK || erase_outgoing_connector_result == SC_RESULT_OK + || erase_incoming_arc_result == SC_RESULT_OK || erase_outgoing_arc_result == SC_RESULT_OK + || erase_element_result == SC_RESULT_OK || there_are_active_erase_events_with_addr) + *does_branch_have_emitted_events = SC_TRUE; - sc_hash_table_insert(cache_table, p_addr, connector); - sc_queue_push(&iter_queue, p_addr); - } + sc_addr connector_addr = el->first_out_arc; + while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) + { + sc_pointer p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); + + sc_element * connector = sc_hash_table_get(processed_connectors, p_addr); + if (connector == null_ptr) + { + result = sc_storage_get_element_by_addr(connector_addr, &connector); + if (result != SC_RESULT_OK) + break; - connector_addr = connector->arc.next_begin_out_arc; + sc_hash_table_insert(processed_connectors, p_addr, connector); + _sc_storage_element_erase_with_incoming_outgoing_connectors( + ctx, + connector_chain_begin_addr, + connector_addr, + processed_connectors, + does_branch_have_emitted_events, + elements_that_can_be_erased, + incident_nodes_under_erasure); } - connector_addr = el->first_in_arc; - while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) - { - p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); + connector_addr = connector->arc.next_begin_out_arc; + } - sc_element * connector = sc_hash_table_get(cache_table, p_addr); - if (connector == null_ptr) - { - result = sc_storage_get_element_by_addr(connector_addr, &connector); - if (result != SC_RESULT_OK) - break; + connector_addr = el->first_in_arc; + while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) + { + sc_pointer p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); - sc_hash_table_insert(cache_table, p_addr, connector); - sc_queue_push(&iter_queue, p_addr); - } + sc_element * connector = sc_hash_table_get(processed_connectors, p_addr); + if (connector == null_ptr) + { + result = sc_storage_get_element_by_addr(connector_addr, &connector); + if (result != SC_RESULT_OK) + break; - connector_addr = connector->arc.next_end_in_arc; + sc_hash_table_insert(processed_connectors, p_addr, connector); + _sc_storage_element_erase_with_incoming_outgoing_connectors( + ctx, + connector_chain_begin_addr, + connector_addr, + processed_connectors, + does_branch_have_emitted_events, + elements_that_can_be_erased, + incident_nodes_under_erasure); } - sc_monitor_release_read(monitor); + connector_addr = connector->arc.next_end_in_arc; } + sc_monitor_release_read(monitor); - sc_queue_destroy(&iter_queue); - sc_hash_table_destroy(cache_table); - - while (!sc_queue_empty(&addrs_with_not_emitted_erase_events)) + // if addr is connector and its source/target is node that is under erasure and does not have emitted erase events + // then cache source/target to try to erase them with their incoming/outgoing connectors + if (sc_type_is_connector(type) && !*does_branch_have_emitted_events) { - sc_addr_hash addr_int = (sc_pointer_to_sc_addr_hash)sc_queue_pop(&addrs_with_not_emitted_erase_events); - addr.seg = SC_ADDR_LOCAL_SEG_FROM_INT(addr_int); - addr.offset = SC_ADDR_LOCAL_OFFSET_FROM_INT(addr_int); + // TODO(NikitaZotov): Provide causal consistency for agents responding to sc-events of erasing sc-elements occurring + // within the same semantic neighbourhood. It should be that 1) agents, reacted to sc-event of erasing sc-elements, + // know about this sc-element until the end of their existence, 2) the agent, that reacted to sc-event of erasing + // sc-element, can view the entire semantic neighbourhood of this sc-element, even if some of sc-connectors in this + // neighbourhood is erased by another agent. + if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, begin_addr)) + _sc_storage_cache_elements_under_erasure_without_erase_events( + begin_addr, incident_nodes_under_erasure, processed_connectors); + if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, end_addr)) + _sc_storage_cache_elements_under_erasure_without_erase_events( + end_addr, incident_nodes_under_erasure, processed_connectors); + } - _sc_storage_element_erase(addr); + if (!*does_branch_have_emitted_events) + sc_list_push_back(elements_that_can_be_erased, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); + return SC_RESULT_OK; +} + +sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors_and_hanging_nodes( + sc_memory_context const * ctx, + sc_addr erased_element_addr, + sc_addr reason_of_erasure_addr, + sc_hash_table * connectors_added_to_queue) +{ + sc_element * element; + sc_storage_get_element_by_addr(erased_element_addr, &element); + sc_hash_table_insert(connectors_added_to_queue, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(erased_element_addr)), element); + + sc_hash_table * incident_nodes_under_erasure = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); + sc_bool does_branch_have_emitted_events = SC_FALSE; + sc_list * elements_that_can_be_erased; + sc_list_init(&elements_that_can_be_erased); + + sc_result result = _sc_storage_element_erase_with_incoming_outgoing_connectors( + ctx, + reason_of_erasure_addr, + erased_element_addr, + connectors_added_to_queue, + &does_branch_have_emitted_events, + elements_that_can_be_erased, + incident_nodes_under_erasure); + + sc_iterator * elements_it = sc_list_iterator(elements_that_can_be_erased); + sc_addr from_hash_addr; + while (sc_iterator_next(elements_it)) + { + sc_addr_hash addr_hash = (sc_pointer_to_sc_addr_hash)sc_iterator_get(elements_it); + SC_ADDR_LOCAL_FROM_INT(addr_hash, from_hash_addr); + _sc_storage_element_erase(from_hash_addr); + } + sc_iterator_destroy(elements_it); + sc_list_destroy(elements_that_can_be_erased); + if (!does_branch_have_emitted_events) + { + sc_hash_table_iterator nodes_to_erase_iterator; + sc_hash_table_iterator_init(&nodes_to_erase_iterator, incident_nodes_under_erasure); + sc_pointer key, value; + while (sc_hash_table_iterator_next(&nodes_to_erase_iterator, &key, &value)) + { + if (sc_hash_table_get(connectors_added_to_queue, key) == null_ptr) + { + sc_addr node_that_was_erased_addr; + SC_ADDR_LOCAL_FROM_INT((sc_pointer_to_sc_addr_hash)key, node_that_was_erased_addr); + _sc_storage_element_erase_with_incoming_outgoing_connectors_and_hanging_nodes( + ctx, node_that_was_erased_addr, reason_of_erasure_addr, connectors_added_to_queue); + } + } } - sc_queue_destroy(&addrs_with_not_emitted_erase_events); + sc_hash_table_destroy(incident_nodes_under_erasure); + return result; +} - result = SC_RESULT_OK; -error: +sc_result sc_storage_element_erase(sc_memory_context const * ctx, sc_addr addr) +{ + sc_hash_table * connectors_added_to_queue = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); + sc_result result = _sc_storage_element_erase_with_incoming_outgoing_connectors_and_hanging_nodes( + ctx, addr, addr, connectors_added_to_queue); + sc_hash_table_destroy(connectors_added_to_queue); return result; } diff --git a/sc-memory/sc-core/src/sc_memory_context_manager.c b/sc-memory/sc-core/src/sc_memory_context_manager.c index 3991e8e4d..e3dc90b96 100644 --- a/sc-memory/sc-core/src/sc_memory_context_manager.c +++ b/sc-memory/sc-core/src/sc_memory_context_manager.c @@ -232,8 +232,8 @@ sc_bool _sc_memory_context_are_events_pending(sc_memory_context const * ctx) void _sc_memory_context_pend_event( sc_memory_context const * ctx, - sc_event_type event_type_addr, sc_addr subscription_addr, + sc_event_type event_type_addr, sc_addr connector_addr, sc_type connector_type, sc_addr other_addr) diff --git a/sc-memory/sc-core/src/sc_memory_context_manager.h b/sc-memory/sc-core/src/sc_memory_context_manager.h index 4113e9094..18696166f 100644 --- a/sc-memory/sc-core/src/sc_memory_context_manager.h +++ b/sc-memory/sc-core/src/sc_memory_context_manager.h @@ -13,6 +13,7 @@ #include "sc-core/sc-base/sc_monitor.h" #include "sc-store/sc-base/sc_message.h" +#include "sc-store/sc-event/sc_event_queue.h" typedef struct _sc_memory_context_manager sc_memory_context_manager; typedef struct _sc_event_emit_params sc_event_emit_params; @@ -104,8 +105,8 @@ void _sc_memory_context_pending_end(sc_memory_context * ctx); /*! Function that adds an event to the pending events list in a sc-memory context. * @param ctx Pointer to the sc-memory context to which the event is added. - * @param type Type of the event to be added. * @param subscription_addr sc_addr representing the sc-element associated with the event. + * @param event_type_addr Type of the event to be added. * @param connector_addr sc-address representing the sc-connector associated with the event. * @param connector_type sc-type representing the sc-connector associated with the event. * @param other_addr sc-address representing the other sc-element associated with the event. @@ -113,8 +114,8 @@ void _sc_memory_context_pending_end(sc_memory_context * ctx); */ void _sc_memory_context_pend_event( sc_memory_context const * ctx, - sc_event_type event_type_addr, sc_addr subscription_addr, + sc_event_type event_type_addr, sc_addr connector_addr, sc_type connector_type, sc_addr other_addr); diff --git a/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp b/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp index 1ed2a8a2d..e7ebd8134 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp @@ -1940,8 +1940,11 @@ TEST_F(ScSpecifiedAgentTest, ATestSpecifiedAgentGeneratingIncomingArcHasFullSpec ScAddr const & testSetAddr = m_ctx->SearchElementBySystemIdentifier("test_set"); ScAddr const & testOtherSetAddr = m_ctx->SearchElementBySystemIdentifier("test_other_set"); ScAddr const & testRelation = m_ctx->SearchElementBySystemIdentifier("test_relation"); - ScAddr const & edgeAddr = m_ctx->GenerateConnector(ScType::ConstPermPosArc, testOtherSetAddr, testSetAddr); - m_ctx->GenerateConnector(ScType::ConstPermPosArc, testRelation, edgeAddr); + { + ScMemoryContextEventsPendingGuard guard(*m_ctx); + ScAddr const & arcAddr = m_ctx->GenerateConnector(ScType::ConstPermPosArc, testOtherSetAddr, testSetAddr); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, testRelation, arcAddr); + } EXPECT_TRUE(ATestSpecifiedAgent::msWaiter.Wait()); diff --git a/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp b/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp index 70d64367c..0b656c436 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp @@ -541,7 +541,7 @@ TEST_F( ScAddr arcAddr; std::atomic_bool isChecked = false; { - auto eventSubscription = + auto eventSubscriptionForErasure = m_ctx->CreateElementaryEventSubscription>( usersSetAddr, [this, &userContext, &isChecked](ScEventBeforeEraseOutgoingArc const &) @@ -1629,7 +1629,7 @@ TEST_F( ScAddr usersSetEdgeAddr; std::atomic_bool isChecked = false; { - auto eventSubscription = + auto eventSubscriptionForErasure = m_ctx->CreateElementaryEventSubscription>( usersSetAddr, [&](ScEventBeforeEraseOutgoingArc const &) diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 91920a4da..235cef6d1 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -105,9 +105,13 @@ bool TestEventSubscriptionGenerateConnector(ScAgentContext * ctx) { nodeAddr2 = ctx->GenerateNode(ScType::ConstNode); EXPECT_TRUE(nodeAddr2.IsValid()); - - arcAddr = ctx->GenerateConnector(eventConnectorType, nodeAddr2, nodeAddr1); - EXPECT_TRUE(arcAddr.IsValid()); + { + // sometimes OnEvent is called before arcAddr is assigned to generated connector, so pending is used to assure + // that arcAddr is assigned before it is used in OnEvent + ScMemoryContextEventsPendingGuard guard(*ctx); + arcAddr = ctx->GenerateConnector(eventConnectorType, nodeAddr2, nodeAddr1); + EXPECT_TRUE(arcAddr.IsValid()); + } }; bool isDone = false; @@ -1367,3 +1371,349 @@ TEST_F(ScEventTest, BlockEventsGuardAndEmitAfter) std::this_thread::sleep_for(std::chrono::milliseconds(10)); EXPECT_TRUE(isCalled); } + +TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) +{ + int const sleepTime = 20; + ScAddr nodeAddr1 = m_ctx->GenerateNode(ScType::ConstNode); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr1, + [&isLongExecutedSubscriptionCalled, this](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + EXPECT_TRUE(m_ctx->IsElement(event.GetArc())); + auto const & [sourceAddr, targetAddr] = m_ctx->GetConnectorIncidentElements(event.GetArc()); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr)); + auto const & [sourceAddr2, targetAddr2] = m_ctx->GetConnectorIncidentElements(targetAddr); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr2)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr2)); + auto const & [sourceAddr3, targetAddr3] = m_ctx->GetConnectorIncidentElements(targetAddr2); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr3)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr3)); + auto const & [sourceAddr4, targetAddr4] = m_ctx->GetConnectorIncidentElements(targetAddr3); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr4)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr4)); + EXPECT_TRUE(m_ctx->GetElementType(targetAddr4).IsNode()); + }); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr1, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + + ScAddr const nodeAddr2 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const nodeAddr3 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const nodeAddr4 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const nodeAddr5 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const & arcAddr1 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr3, nodeAddr2); + ScAddr const & arcAddr2 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr4, arcAddr1); + ScAddr const & arcAddr3 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr5, arcAddr2); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr1, arcAddr3); + + m_ctx->EraseElement(nodeAddr2); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, TwoSubscriptionsForNodeErasure) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithTwoSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithTwoSubscriptionsAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithTwoSubscriptionsAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithTwoSubscriptionsAddr); + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithTwoSubscriptionsAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithTwoSubscriptionsAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingFirstAndNodeFinishEarlier) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithSubscriptionAddr); + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingFirstAndNodeFinishLater) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithSubscriptionAddr); + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingSecondAndNodeFinishEarlier) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + m_ctx->EraseElement(nodeWithSubscriptionAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingSecondAndNodeFinishLater) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + m_ctx->EraseElement(nodeWithSubscriptionAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarlier) +{ + int const sleepTime = 20; + ScAddr nodeAddr1 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr2 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr3 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr4 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr5 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr arc1 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr1, nodeAddr2); + ScAddr arc2 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr3, nodeAddr4); + ScAddr arc3 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, arc2, arc1); + ScAddr arc4 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr5, arc2); + + bool isShortExecutedSubscriptionForArc1Called = false; + auto shortExecutedSubscriptionForArc1 = m_ctx->CreateElementaryEventSubscription( + nodeAddr1, + [&](auto const &) + { + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(arc3)); + + EXPECT_FALSE(isShortExecutedSubscriptionForArc1Called); + isShortExecutedSubscriptionForArc1Called = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + bool isMediumExecutedSubscriptionForArc4Called = false; + auto mediumExecutedSubscriptionForArc4 = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr5, + [&](auto const &) + { + EXPECT_TRUE(m_ctx->IsElement(arc2)); + EXPECT_TRUE(m_ctx->IsElement(arc4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + + EXPECT_FALSE(isMediumExecutedSubscriptionForArc4Called); + isMediumExecutedSubscriptionForArc4Called = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr1, + [&](auto const &) + { + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + + // TODO(NikitaZotov): Provide causal consistency for agents responding to sc-events of erasing sc-elements + // occurring within the same semantic neighbourhood. It should be that 1) agents, reacted to sc-event of + // erasing sc-elements, know about this sc-element until the end of their existence, 2) the agent, that + // reacted to sc-event of erasing sc-element, can view the entire semantic neighbourhood of this sc-element, + // even if some of sc-connectors in this neighbourhood is erased by another agent. + // EXPECT_TRUE(m_ctx->IsElement(arc3)); + + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeAddr1); + m_ctx->EraseElement(nodeAddr2); + m_ctx->EraseElement(nodeAddr4); + + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr3)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_TRUE(m_ctx->IsElement(arc2)); + EXPECT_FALSE(m_ctx->IsElement(arc3)); + EXPECT_TRUE(m_ctx->IsElement(arc4)); + + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr3)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_FALSE(m_ctx->IsElement(arc2)); + EXPECT_FALSE(m_ctx->IsElement(arc3)); + EXPECT_FALSE(m_ctx->IsElement(arc4)); + + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr1)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr3)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + EXPECT_FALSE(m_ctx->IsElement(arc1)); + EXPECT_FALSE(m_ctx->IsElement(arc2)); + EXPECT_FALSE(m_ctx->IsElement(arc3)); + EXPECT_FALSE(m_ctx->IsElement(arc4)); + + EXPECT_TRUE(isShortExecutedSubscriptionForArc1Called); + EXPECT_TRUE(isMediumExecutedSubscriptionForArc4Called); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp index 8b99f0dae..8566e7495 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp @@ -239,7 +239,7 @@ TEST_F(ScWaiterTest, InvalidWaitersWithCondition) m_ctx->CreateConditionWaiter(nodeAddr, {}), utils::ExceptionInvalidParams); } -TEST_F(ScWaiterTest, InvalidEventsFotWaiters) +TEST_F(ScWaiterTest, InvalidEventsForWaiters) { ScAddr nodeAddr = m_ctx->GenerateNode(ScType::ConstNode); ScAddr eventClassAddr; @@ -256,7 +256,7 @@ TEST_F(ScWaiterTest, InvalidEventsFotWaiters) EXPECT_THROW(m_ctx->CreateEventWaiter(eventClassAddr, nodeAddr, {}), utils::ExceptionInvalidParams); } -TEST_F(ScWaiterTest, InvalidEventsFotWaitersWithConditions) +TEST_F(ScWaiterTest, InvalidEventsForWaitersWithConditions) { ScAddr nodeAddr = m_ctx->GenerateNode(ScType::ConstNode); ScAddr eventClassAddr;