diff --git a/Core/SoarKernel/src/episodic_memory/episodic_memory.cpp b/Core/SoarKernel/src/episodic_memory/episodic_memory.cpp index 7da7350610..e700c7f9d2 100644 --- a/Core/SoarKernel/src/episodic_memory/episodic_memory.cpp +++ b/Core/SoarKernel/src/episodic_memory/episodic_memory.cpp @@ -180,6 +180,26 @@ epmem_param_container::epmem_param_container(agent* new_agent): soar_module::par merge->add_mapping(merge_none, "none"); merge->add_mapping(merge_add, "add"); add(merge); + + //////////////////// + // Consolidation + //////////////////// + + // consolidate on/off + consolidate = new soar_module::boolean_param("consolidate", off, new soar_module::f_predicate()); + add(consolidate); + + // consolidate-interval + consolidate_interval = new soar_module::integer_param("consolidate-interval", 100, new soar_module::gt_predicate(0, true), new soar_module::f_predicate()); + add(consolidate_interval); + + // consolidate-threshold + consolidate_threshold = new soar_module::integer_param("consolidate-threshold", 10, new soar_module::gt_predicate(0, true), new soar_module::f_predicate()); + add(consolidate_threshold); + + // consolidate-evict-age: min episode age before eviction eligible (0 = no eviction) + consolidate_evict_age = new soar_module::integer_param("consolidate-evict-age", 0, new soar_module::gt_predicate(0, true), new soar_module::f_predicate()); + add(consolidate_evict_age); } // @@ -599,6 +619,10 @@ epmem_stat_container::epmem_stat_container(agent* new_agent): soar_module::stat_ next_id = new epmem_node_id_stat("next-id", 0, new epmem_db_predicate(thisAgent)); add(next_id); + // last-consolidation + last_consolidation = new epmem_time_id_stat("last-consolidation", 0, new soar_module::f_predicate()); + add(last_consolidation); + // rit-offset-1 rit_offset_1 = new soar_module::integer_stat("rit-offset-1", 0, new epmem_db_predicate(thisAgent)); add(rit_offset_1); @@ -966,6 +990,7 @@ void epmem_graph_statement_container::create_graph_tables() add_structure("CREATE TABLE IF NOT EXISTS epmem_wmes_constant (wc_id INTEGER PRIMARY KEY AUTOINCREMENT,parent_n_id INTEGER,attribute_s_id INTEGER, value_s_id INTEGER)"); add_structure("CREATE TABLE IF NOT EXISTS epmem_wmes_identifier (wi_id INTEGER PRIMARY KEY AUTOINCREMENT,parent_n_id INTEGER,attribute_s_id INTEGER,child_n_id INTEGER, last_episode_id INTEGER)"); add_structure("CREATE TABLE IF NOT EXISTS epmem_ascii (ascii_num INTEGER PRIMARY KEY, ascii_chr TEXT)"); + add_structure("CREATE TABLE IF NOT EXISTS epmem_consolidated (wc_id INTEGER PRIMARY KEY)"); } void epmem_graph_statement_container::create_graph_indices() @@ -1014,6 +1039,7 @@ void epmem_graph_statement_container::drop_graph_tables() add_structure("DROP TABLE IF EXISTS epmem_wmes_identifier_range"); add_structure("DROP TABLE IF EXISTS epmem_wmes_constant"); add_structure("DROP TABLE IF EXISTS epmem_wmes_identifier"); + add_structure("DROP TABLE IF EXISTS epmem_consolidated"); } epmem_graph_statement_container::epmem_graph_statement_container(agent* new_agent): soar_module::sqlite_statement_container(new_agent->EpMem->epmem_db) @@ -1153,6 +1179,43 @@ epmem_graph_statement_container::epmem_graph_statement_container(agent* new_agen update_epmem_wmes_identifier_last_episode_id = new soar_module::sqlite_statement(new_db, "UPDATE epmem_wmes_identifier SET last_episode_id=? WHERE wi_id=?"); add(update_epmem_wmes_identifier_last_episode_id); + // consolidation query: find constant WMEs present for >= threshold episodes, excluding already-consolidated + consolidate_find_stable = new soar_module::sqlite_statement(new_db, + "SELECT wc.wc_id, wc.parent_n_id, wc.attribute_s_id, wc.value_s_id " + "FROM epmem_wmes_constant wc " + "JOIN epmem_wmes_constant_now cn ON cn.wc_id = wc.wc_id " + "LEFT JOIN epmem_consolidated ec ON ec.wc_id = wc.wc_id " + "WHERE cn.start_episode_id <= (? - ?) " + " AND ec.wc_id IS NULL " + "ORDER BY wc.parent_n_id"); + add(consolidate_find_stable); + + // consolidation: mark wc_id as consolidated + consolidate_mark = new soar_module::sqlite_statement(new_db, + "INSERT OR IGNORE INTO epmem_consolidated (wc_id) VALUES (?)"); + add(consolidate_mark); + + // eviction: delete old episode rows and their point entries + consolidate_evict_episode = new soar_module::sqlite_statement(new_db, + "DELETE FROM epmem_episodes WHERE episode_id < ?"); + add(consolidate_evict_episode); + + consolidate_evict_constant_point = new soar_module::sqlite_statement(new_db, + "DELETE FROM epmem_wmes_constant_point WHERE episode_id < ?"); + add(consolidate_evict_constant_point); + + consolidate_evict_identifier_point = new soar_module::sqlite_statement(new_db, + "DELETE FROM epmem_wmes_identifier_point WHERE episode_id < ?"); + add(consolidate_evict_identifier_point); + + consolidate_evict_constant_range = new soar_module::sqlite_statement(new_db, + "DELETE FROM epmem_wmes_constant_range WHERE end_episode_id < ?"); + add(consolidate_evict_constant_range); + + consolidate_evict_identifier_range = new soar_module::sqlite_statement(new_db, + "DELETE FROM epmem_wmes_identifier_range WHERE end_episode_id < ?"); + add(consolidate_evict_identifier_range); + // init statement pools { int j, k, m; @@ -5913,6 +5976,185 @@ void epmem_respond_to_cmd(agent* thisAgent) * Notes : The kernel calls this function to implement Soar-EpMem: * consider new storage and respond to any commands **************************************************************************/ +/*************************************************************************** + * Function : epmem_consolidate + * Author : June Kim + * Notes : Scans episodic memory for stable WME structures and + * writes them to semantic memory. Implements the + * compose+test framework (Casteigts et al., 2019) for + * episodic-to-semantic consolidation. + * + * Compose: union of WMEs active in the current window + * Test: continuous presence >= consolidate-threshold episodes + * Write: create new smem LTI with qualifying augmentations + * + * Runs periodically based on consolidate-interval parameter. + * Off by default (consolidate = off). + **************************************************************************/ +void epmem_consolidate(agent* thisAgent) +{ + // Check if consolidation is enabled + if (thisAgent->EpMem->epmem_params->consolidate->get_value() == off) + { + return; + } + + // Check if epmem DB is connected + if (thisAgent->EpMem->epmem_db->get_status() != soar_module::connected) + { + return; + } + + // Check if smem is enabled (CLI_add will handle connection) + if (!thisAgent->SMem->enabled()) + { + return; + } + + epmem_time_id current_episode = thisAgent->EpMem->epmem_stats->time->get_value(); + epmem_time_id last_consol = thisAgent->EpMem->epmem_stats->last_consolidation->get_value(); + int64_t interval = thisAgent->EpMem->epmem_params->consolidate_interval->get_value(); + int64_t threshold = thisAgent->EpMem->epmem_params->consolidate_threshold->get_value(); + + // Check if enough episodes have passed since last consolidation + if ((current_episode - last_consol) < static_cast(interval)) + { + return; + } + + // Run the compose+test query: find constant WMEs present for >= threshold episodes + // Query now excludes already-consolidated wc_ids via LEFT JOIN + soar_module::sqlite_statement* find_stable = thisAgent->EpMem->epmem_stmts_graph->consolidate_find_stable; + find_stable->bind_int(1, current_episode); + find_stable->bind_int(2, threshold); + + // Collect results grouped by parent, filtering empty symbols before building string + struct consolidation_entry { + epmem_node_id wc_id; + std::string attr; + std::string value; + }; + + std::map> parent_groups; + + while (find_stable->execute() == soar_module::row) + { + epmem_node_id wc_id = find_stable->column_int(0); + epmem_node_id parent_n_id = find_stable->column_int(1); + epmem_hash_id attr_s_id = find_stable->column_int(2); + epmem_hash_id value_s_id = find_stable->column_int(3); + + // Reverse-hash the attribute and value to get printable strings + std::string attr_str, value_str; + epmem_reverse_hash_print(thisAgent, attr_s_id, attr_str); + epmem_reverse_hash_print(thisAgent, value_s_id, value_str); + + if (attr_str.empty() || value_str.empty()) continue; + + // Skip symbols containing pipe characters — they can't be safely quoted + if (attr_str.find('|') != std::string::npos || value_str.find('|') != std::string::npos) + { + continue; + } + + // Pipe-quote strings that contain special characters + auto needs_quoting = [](const std::string& s) { + for (char c : s) { + if (c == ' ' || c == '(' || c == ')' || c == '^' || c == '{' || c == '}') + return true; + } + return false; + }; + + if (needs_quoting(attr_str)) attr_str = "|" + attr_str + "|"; + if (needs_quoting(value_str)) value_str = "|" + value_str + "|"; + + consolidation_entry e; + e.wc_id = wc_id; + e.attr = attr_str; + e.value = value_str; + parent_groups[parent_n_id].push_back(e); + } + find_stable->reinitialize(); + + // Build smem add string and collect wc_ids + std::string smem_add_str; + std::vector consolidated_wc_ids; + int lti_count = 0; + + for (auto& kv : parent_groups) + { + if (kv.second.empty()) continue; + lti_count++; + smem_add_str += "("; + for (auto& e : kv.second) + { + smem_add_str += " ^" + e.attr + " " + e.value; + consolidated_wc_ids.push_back(e.wc_id); + } + smem_add_str += ")\n"; + } + + // Write to smem if we found anything + if (lti_count > 0) + { + std::string* err_msg = new std::string(""); + bool success = thisAgent->SMem->CLI_add(smem_add_str.c_str(), &err_msg); + delete err_msg; + + if (success) + { + // Record consolidated wc_ids to prevent duplicates on next run + for (epmem_node_id wc_id : consolidated_wc_ids) + { + thisAgent->EpMem->epmem_stmts_graph->consolidate_mark->bind_int(1, wc_id); + thisAgent->EpMem->epmem_stmts_graph->consolidate_mark->execute(soar_module::op_reinit); + } + } + } + + // Eviction: remove old episodes if evict-age is set + int64_t evict_age = thisAgent->EpMem->epmem_params->consolidate_evict_age->get_value(); + if (evict_age > 0 && current_episode > static_cast(evict_age)) + { + epmem_time_id evict_before = current_episode - evict_age; + + // Wrap eviction in a transaction if lazy_commit is off + // (when lazy_commit is on, we're already inside a transaction) + bool needs_txn = (thisAgent->EpMem->epmem_params->lazy_commit->get_value() == off); + if (needs_txn) + { + thisAgent->EpMem->epmem_stmts_common->begin->execute(soar_module::op_reinit); + } + + // Delete range entries whose intervals end entirely before the cutoff + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_constant_range->bind_int(1, evict_before); + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_constant_range->execute(soar_module::op_reinit); + + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_identifier_range->bind_int(1, evict_before); + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_identifier_range->execute(soar_module::op_reinit); + + // Delete point entries for old episodes + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_constant_point->bind_int(1, evict_before); + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_constant_point->execute(soar_module::op_reinit); + + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_identifier_point->bind_int(1, evict_before); + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_identifier_point->execute(soar_module::op_reinit); + + // Delete old episode rows + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_episode->bind_int(1, evict_before); + thisAgent->EpMem->epmem_stmts_graph->consolidate_evict_episode->execute(soar_module::op_reinit); + + if (needs_txn) + { + thisAgent->EpMem->epmem_stmts_common->commit->execute(soar_module::op_reinit); + } + } + + // Update last consolidation stat + thisAgent->EpMem->epmem_stats->last_consolidation->set_value(current_episode); +} + void epmem_go(agent* thisAgent, bool allow_store) { @@ -5924,6 +6166,8 @@ void epmem_go(agent* thisAgent, bool allow_store) } epmem_respond_to_cmd(thisAgent); + // Periodic consolidation: episodic -> semantic + epmem_consolidate(thisAgent); thisAgent->EpMem->epmem_timers->total->stop(); diff --git a/Core/SoarKernel/src/episodic_memory/episodic_memory.h b/Core/SoarKernel/src/episodic_memory/episodic_memory.h index 6630a5bda7..e5f9e42be4 100644 --- a/Core/SoarKernel/src/episodic_memory/episodic_memory.h +++ b/Core/SoarKernel/src/episodic_memory/episodic_memory.h @@ -81,6 +81,12 @@ class epmem_param_container: public soar_module::param_container soar_module::constant_param* gm_ordering; soar_module::constant_param* merge; + // consolidation + soar_module::boolean_param* consolidate; + soar_module::integer_param* consolidate_interval; + soar_module::integer_param* consolidate_threshold; + soar_module::integer_param* consolidate_evict_age; + epmem_param_container(agent* new_agent); }; @@ -135,6 +141,8 @@ class epmem_stat_container: public soar_module::stat_container epmem_node_id_stat* next_id; + epmem_time_id_stat* last_consolidation; + soar_module::integer_stat* rit_offset_1; soar_module::integer_stat* rit_left_root_1; soar_module::integer_stat* rit_right_root_1; @@ -329,6 +337,17 @@ class epmem_graph_statement_container: public soar_module::sqlite_statement_cont soar_module::sqlite_statement* update_epmem_wmes_identifier_last_episode_id; + // consolidation + soar_module::sqlite_statement* consolidate_find_stable; + soar_module::sqlite_statement* consolidate_mark; + + // eviction + soar_module::sqlite_statement* consolidate_evict_episode; + soar_module::sqlite_statement* consolidate_evict_constant_point; + soar_module::sqlite_statement* consolidate_evict_identifier_point; + soar_module::sqlite_statement* consolidate_evict_constant_range; + soar_module::sqlite_statement* consolidate_evict_identifier_range; + // soar_module::sqlite_statement_pool* pool_find_edge_queries[2][2]; @@ -471,6 +490,7 @@ extern void epmem_clear_transient_structures(agent* thisAgent); // perform epmem actions extern void epmem_go(agent* thisAgent, bool allow_store = true); +extern void epmem_consolidate(agent* thisAgent); extern bool epmem_backup_db(agent* thisAgent, const char* file_name, std::string* err); extern void epmem_init_db(agent* thisAgent, bool readonly = false); // visualization diff --git a/UnitTests/SoarTestAgents/epmem/EpMemFunctionalTests_testConsolidation.soar b/UnitTests/SoarTestAgents/epmem/EpMemFunctionalTests_testConsolidation.soar new file mode 100644 index 0000000000..4ea46beb06 --- /dev/null +++ b/UnitTests/SoarTestAgents/epmem/EpMemFunctionalTests_testConsolidation.soar @@ -0,0 +1,47 @@ +# Test episodic-to-semantic memory consolidation +# Enable epmem with dc trigger +epmem --set trigger dc +epmem --set learning on + +# Enable smem +smem --set learning on + +# Enable consolidation with short interval for testing +epmem --set consolidate on +epmem --set consolidate-interval 15 +epmem --set consolidate-threshold 10 + +### Initialize: create a stable structure that will persist +sp {propose*init + (state ^superstate nil + -^name) +--> + ( ^operator +) + ( ^name init) +} + +sp {apply*init + (state ^operator.name init) +--> + ( ^name consolidation-test + ^item + ^counter 0) + ( ^color red ^shape circle) +} + +### Count decision cycles to keep the agent running +sp {propose*count + (state ^name consolidation-test + ^counter ) +--> + ( ^operator + =) + ( ^name count) +} + +sp {apply*count + (state ^operator.name count + ^counter ) +--> + ( ^counter -) + ( ^counter (+ 1)) +} diff --git a/UnitTests/SoarTestAgents/epmem/EpMemFunctionalTests_testConsolidationEviction.soar b/UnitTests/SoarTestAgents/epmem/EpMemFunctionalTests_testConsolidationEviction.soar new file mode 100644 index 0000000000..e86580408b --- /dev/null +++ b/UnitTests/SoarTestAgents/epmem/EpMemFunctionalTests_testConsolidationEviction.soar @@ -0,0 +1,50 @@ +# Test episodic memory eviction after consolidation +# Enable epmem with dc trigger +epmem --set trigger dc +epmem --set learning on + +# Enable smem +smem --set learning on + +# Enable consolidation with short interval for testing +epmem --set consolidate on +epmem --set consolidate-interval 15 +epmem --set consolidate-threshold 10 + +# Enable eviction: episodes older than 12 cycles get evicted +epmem --set consolidate-evict-age 12 + +### Initialize: create a stable structure that will persist +sp {propose*init + (state ^superstate nil + -^name) +--> + ( ^operator +) + ( ^name init) +} + +sp {apply*init + (state ^operator.name init) +--> + ( ^name eviction-test + ^item + ^counter 0) + ( ^color red ^shape circle) +} + +### Count decision cycles to keep the agent running +sp {propose*count + (state ^name eviction-test + ^counter ) +--> + ( ^operator + =) + ( ^name count) +} + +sp {apply*count + (state ^operator.name count + ^counter ) +--> + ( ^counter -) + ( ^counter (+ 1)) +} diff --git a/UnitTests/SoarUnitTests/EpMemFunctionalTests.cpp b/UnitTests/SoarUnitTests/EpMemFunctionalTests.cpp index ac4e95d2bb..2edddf7755 100644 --- a/UnitTests/SoarUnitTests/EpMemFunctionalTests.cpp +++ b/UnitTests/SoarUnitTests/EpMemFunctionalTests.cpp @@ -303,6 +303,50 @@ void EpMemFunctionalTests::testEpmemUnit_14() runTest("epmem_unit_test_14", 113); } +void EpMemFunctionalTests::testConsolidation() +{ + runTestSetup("testConsolidation"); + agent->RunSelf(25); + + // Verify consolidation wrote to smem + std::string smemResult = agent->ExecuteCommandLine("p @"); + assertTrue_msg("SMem should contain consolidated entries after 25 cycles:\n" + smemResult, + smemResult.find("red") != std::string::npos); +} + +void EpMemFunctionalTests::testConsolidationOff() +{ + runTestSetup("testConsolidation"); + agent->ExecuteCommandLine("epmem --set consolidate off"); + agent->RunSelf(25); + + // Verify smem has no consolidated entries when consolidation is off + std::string smemResult = agent->ExecuteCommandLine("p @"); + assertTrue_msg("SMem should not contain 'red' with consolidation off", + smemResult.find("red") == std::string::npos); +} + +void EpMemFunctionalTests::testConsolidationEviction() +{ + runTestSetup("testConsolidationEviction"); + agent->RunSelf(25); + + // Verify consolidation still wrote to smem + std::string smemResult = agent->ExecuteCommandLine("p @"); + assertTrue_msg("SMem should contain consolidated entries after 25 cycles:\n" + smemResult, + smemResult.find("red") != std::string::npos); + + // Verify old episodes were evicted (episode 1 should be gone, evict_before = 25 - 12 = 13) + std::string ep1 = agent->ExecuteCommandLine("epmem --print 1"); + assertTrue_msg("Episode 1 should have been evicted:\n" + ep1, + ep1.find("Episode 1") == std::string::npos); + + // Verify recent episodes still exist (episode 20 should still be there) + std::string ep20 = agent->ExecuteCommandLine("epmem --print 20"); + assertTrue_msg("Episode 20 should still exist:\n" + ep20, + ep20.find("Episode 20") != std::string::npos); +} + void EpMemFunctionalTests::testEpMemSmemFactorizationCombinationTest() { runTestSetup("testSMemEpMemFactorization"); diff --git a/UnitTests/SoarUnitTests/EpMemFunctionalTests.hpp b/UnitTests/SoarUnitTests/EpMemFunctionalTests.hpp index 8c50d59e25..968da06977 100644 --- a/UnitTests/SoarUnitTests/EpMemFunctionalTests.hpp +++ b/UnitTests/SoarUnitTests/EpMemFunctionalTests.hpp @@ -58,8 +58,11 @@ class EpMemFunctionalTests : public FunctionalTestHarness { TEST(testWMELength_FiveCycle, -1) TEST(testWMELength_InfiniteCycle, -1) TEST(testWMELength_MultiCycle, -1) - TEST(testWMELength_OneCycle, -1); - + TEST(testWMELength_OneCycle, -1) + TEST(testConsolidation, -1) + TEST(testConsolidationOff, -1) + TEST(testConsolidationEviction, -1); + void testAfterEpMem(); void testAllNegQueriesEpMem(); void testBeforeAfterProhibitEpMem(); @@ -105,6 +108,9 @@ class EpMemFunctionalTests : public FunctionalTestHarness { void testWMELength_InfiniteCycle(); void testWMELength_MultiCycle(); void testWMELength_OneCycle(); + void testConsolidation(); + void testConsolidationOff(); + void testConsolidationEviction(); void after(bool caught) { tearDown(caught); } void tearDown(bool caught);