Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sc-memory/sc-core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ file(GLOB SOURCES CONFIGURE_DEPENDS
find_glib()

add_library(sc-core SHARED ${SOURCES})
target_link_libraries(sc-core LINK_PRIVATE ${glib_LIBRARIES})
target_link_libraries(sc-core LINK_PRIVATE ${glib_LIBRARIES} aio )
target_include_directories(sc-core
PRIVATE ${glib_INCLUDE_DIRS}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
Expand Down
194 changes: 152 additions & 42 deletions sc-memory/sc-core/src/sc-store/sc-fs-memory/sc_dictionary_fs_memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,93 @@
# define DEFAULT_STRING_INT_SIZE 20
# define DEFAULT_MAX_SEARCHABLE_STRING_SIZE 1000

#define BUFFER_SIZE_LIMIT (10 * 1024 * 1024) // 10 МБ


typedef struct
{
sc_list * link_hashes;
sc_uint64 string_offset;
} sc_link_hash_content;



void _sc_dictionary_fs_memory_buffer_init(sc_write_buffer ** buffer, sc_uint64 capacity)
{
*buffer = sc_mem_new(sc_write_buffer, 1);
(*buffer)->data = sc_mem_new(sc_char, capacity);
(*buffer)->size = 0;
(*buffer)->capacity = capacity;
sc_monitor_init(&(*buffer)->buffer_monitor);
}

// Уничтожение буфера
void _sc_dictionary_fs_memory_buffer_destroy(sc_write_buffer * buffer)
{
if (buffer == null_ptr)
return;
sc_mem_free(buffer->data);
sc_monitor_destroy(&buffer->buffer_monitor);
sc_mem_free(buffer);
}


// Добавление данных в буфер
sc_bool _sc_dictionary_fs_memory_buffer_append(
sc_write_buffer * buffer,
sc_char const * data,
sc_uint64 size)
{
sc_monitor_acquire_write(&buffer->buffer_monitor);
if (buffer->size + size > buffer->capacity)
{
sc_monitor_release_write(&buffer->buffer_monitor);
return SC_FALSE; // Буфер переполнен
}
sc_mem_cpy(buffer->data + buffer->size, data, size);
buffer->size += size;
sc_monitor_release_write(&buffer->buffer_monitor);
return SC_TRUE;
}



// Сброс буфера на диск
sc_dictionary_fs_memory_status _sc_dictionary_fs_memory_buffer_flush(
sc_dictionary_fs_memory * memory,
sc_write_buffer * buffer,
sc_io_channel * channel,
sc_monitor * channel_monitor)
{
sc_monitor_acquire_write(&buffer->buffer_monitor);
if (buffer->size == 0)
{
sc_monitor_release_write(&buffer->buffer_monitor);
return SC_FS_MEMORY_OK;
}

sc_monitor_acquire_write(channel_monitor);
sc_uint64 written_bytes = 0;
if (sc_io_channel_write_chars(channel, buffer->data, buffer->size, &written_bytes, null_ptr)
!= SC_FS_IO_STATUS_NORMAL
|| written_bytes != buffer->size)
{
sc_fs_memory_error("Error while flushing buffer to disk");
sc_monitor_release_write(channel_monitor);
sc_monitor_release_write(&buffer->buffer_monitor);
return SC_FS_MEMORY_WRITE_ERROR;
}
memory->last_string_offset += written_bytes;
buffer->size = 0; // Очищаем буфер
sc_monitor_release_write(channel_monitor);
sc_monitor_release_write(&buffer->buffer_monitor);
return SC_FS_MEMORY_OK;
}





sc_io_channel * _sc_dictionary_fs_memory_get_strings_channel_by_offset(
sc_dictionary_fs_memory * memory,
sc_uint64 strings_offset,
Expand Down Expand Up @@ -156,6 +237,9 @@ sc_dictionary_fs_memory_status sc_dictionary_fs_memory_initialize_ext(
_sc_number_dictionary_initialize(&(*memory)->string_offsets_link_hashes_dictionary);
static sc_char const * string_offsets_link_hashes = "string_offsets_link_hashes" SC_FS_EXT;
sc_fs_concat_path((*memory)->path, string_offsets_link_hashes, &(*memory)->string_offsets_link_hashes_path);

// Инициализация буфера
_sc_dictionary_fs_memory_buffer_init(&(*memory)->write_buffer, BUFFER_SIZE_LIMIT);
}
sc_fs_memory_info("Configuration:");
sc_message("\tSc-dictionary node size: %zd", sizeof(sc_dictionary_node));
Expand All @@ -171,7 +255,6 @@ sc_dictionary_fs_memory_status sc_dictionary_fs_memory_initialize_ext(
sc_message("\tTerm separators: \"%s\"", (*memory)->term_separators);

sc_fs_memory_info("Successfully initialized");

return SC_FS_MEMORY_OK;

error:
Expand Down Expand Up @@ -203,25 +286,29 @@ sc_dictionary_fs_memory_status sc_dictionary_fs_memory_shutdown(sc_dictionary_fs

sc_fs_memory_info("Shutdown");
{
sc_mem_free(memory->path);

// Сброс буфера перед завершением
for (sc_uint64 i = 0; i < memory->max_strings_channels && memory->strings_channels[i] != null_ptr; ++i)
{
sc_dictionary_destroy(memory->terms_string_offsets_dictionary, _sc_dictionary_fs_memory_node_clear);
sc_mem_free(memory->terms_string_offsets_path);

for (sc_uint64 i = 0; i < memory->max_strings_channels && memory->strings_channels[i] != null_ptr; ++i)
{
sc_io_channel_shutdown(memory->strings_channels[i], SC_TRUE, null_ptr);
}
sc_mem_free(memory->strings_channels);
_sc_monitor_table_destroy(&memory->strings_channels_monitors_table);
sc_monitor_destroy(&memory->monitor);
sc_monitor_destroy(&memory->resolve_string_offset_monitor);
sc_monitor * channel_monitor =
sc_monitor_table_get_monitor_from_table(&memory->strings_channels_monitors_table, (sc_pointer)i);
_sc_dictionary_fs_memory_buffer_flush(
memory, memory->write_buffer, memory->strings_channels[i], channel_monitor);
sc_io_channel_shutdown(memory->strings_channels[i], SC_TRUE, null_ptr);
}

sc_mem_free(memory->path);
sc_dictionary_destroy(memory->terms_string_offsets_dictionary, _sc_dictionary_fs_memory_node_clear);
sc_mem_free(memory->terms_string_offsets_path);
sc_mem_free(memory->strings_channels);
_sc_monitor_table_destroy(&memory->strings_channels_monitors_table);
sc_monitor_destroy(&memory->monitor);
sc_monitor_destroy(&memory->resolve_string_offset_monitor);
sc_dictionary_destroy(memory->link_hashes_string_offsets_dictionary, _sc_dictionary_fs_memory_string_node_clear);
sc_dictionary_destroy(memory->string_offsets_link_hashes_dictionary, _sc_dictionary_fs_memory_link_node_clear);
sc_mem_free(memory->string_offsets_link_hashes_path);

// Уничтожение буфера
_sc_dictionary_fs_memory_buffer_destroy(memory->write_buffer);
}
sc_mem_free(memory);

Expand Down Expand Up @@ -416,60 +503,73 @@ sc_dictionary_fs_memory_status _sc_dictionary_fs_memory_write_string(
if (strings_channel == null_ptr)
goto no_last_channel_error;

// find string if it exists in fs-memory
// Проверяем, существует ли строка
if (is_searchable_string)
{
*string_offset =
_sc_dictionary_fs_memory_get_string_offset_by_string(memory, string, string_size, string_terms->begin->data);
}

sc_monitor_acquire_write(&memory->monitor);
sc_monitor_acquire_write(channel_monitor);
*is_not_exist = (*string_offset == INVALID_STRING_OFFSET);
// save string in fs-memory
if (*is_not_exist)
{
*string_offset = memory->last_string_offset;

sc_uint64 const normalized_string_offset = _sc_dictionary_fs_memory_normalize_offset(memory, *string_offset);
sc_io_channel_seek(strings_channel, normalized_string_offset, SC_FS_IO_SEEK_SET, null_ptr);

sc_uint64 written_bytes = 0;
if (sc_io_channel_write_chars(strings_channel, &string_size, sizeof(string_size), &written_bytes, null_ptr)
!= SC_FS_IO_STATUS_NORMAL
|| sizeof(string_size) != written_bytes)
// Добавляем размер строки и строку в буфер
sc_monitor_acquire_write(&memory->monitor);
if (!_sc_dictionary_fs_memory_buffer_append(memory->write_buffer, (sc_char *)&string_size, sizeof(string_size)))
{
sc_fs_memory_error("Error while attribute `size` writing");
goto write_error;
// Буфер заполнен, сбрасываем его
sc_dictionary_fs_memory_status status =
_sc_dictionary_fs_memory_buffer_flush(memory, memory->write_buffer, strings_channel, channel_monitor);
if (status != SC_FS_MEMORY_OK)
{
sc_monitor_release_write(&memory->monitor);
goto write_error;
}
// Повторяем попытку добавления
if (!_sc_dictionary_fs_memory_buffer_append(memory->write_buffer, (sc_char *)&string_size, sizeof(string_size)))
{
sc_fs_memory_error("Failed to append string size to buffer after flush");
sc_monitor_release_write(&memory->monitor);
goto write_error;
}
}

memory->last_string_offset += written_bytes;

if (sc_io_channel_write_chars(strings_channel, string, string_size, &written_bytes, null_ptr)
!= SC_FS_IO_STATUS_NORMAL
|| string_size != written_bytes)
if (!_sc_dictionary_fs_memory_buffer_append(memory->write_buffer, string, string_size))
{
sc_fs_memory_error("Error while attribute `string` writing");
goto write_error;
// Буфер заполнен, сбрасываем его
sc_dictionary_fs_memory_status status =
_sc_dictionary_fs_memory_buffer_flush(memory, memory->write_buffer, strings_channel, channel_monitor);
if (status != SC_FS_MEMORY_OK)
{
sc_monitor_release_write(&memory->monitor);
goto write_error;
}
// Повторяем попытку добавления
if (!_sc_dictionary_fs_memory_buffer_append(memory->write_buffer, string, string_size))
{
sc_fs_memory_error("Failed to append string to buffer after flush");
sc_monitor_release_write(&memory->monitor);
goto write_error;
}
}

memory->last_string_offset += written_bytes;
sc_monitor_release_write(&memory->monitor);
}

sc_monitor_release_write(channel_monitor);
sc_monitor_release_write(&memory->monitor);
sc_monitor_release_write(&memory->resolve_string_offset_monitor);
return SC_FS_MEMORY_OK;

write_error:
sc_monitor_release_write(channel_monitor);
sc_monitor_release_write(&memory->monitor);
sc_monitor_release_write(&memory->resolve_string_offset_monitor);
return SC_FS_MEMORY_WRITE_ERROR;

no_last_channel_error:
sc_monitor_release_write(&memory->resolve_string_offset_monitor);
return SC_FS_MEMORY_WRITE_ERROR;
}



sc_dictionary_fs_memory_status _sc_dictionary_fs_memory_write_string_terms_string_offset(
sc_dictionary_fs_memory * memory,
sc_uint64 const string_offset,
Expand Down Expand Up @@ -1698,6 +1798,17 @@ sc_dictionary_fs_memory_status sc_dictionary_fs_memory_save(sc_dictionary_fs_mem
}

sc_fs_memory_info("Save sc-fs-memory dictionaries");
// Сбрасываем буфер перед сохранением
for (sc_uint64 i = 0; i < memory->max_strings_channels && memory->strings_channels[i] != null_ptr; ++i)
{
sc_monitor * channel_monitor =
sc_monitor_table_get_monitor_from_table(&memory->strings_channels_monitors_table, (sc_pointer)i);
sc_dictionary_fs_memory_status status =
_sc_dictionary_fs_memory_buffer_flush(memory, memory->write_buffer, memory->strings_channels[i], channel_monitor);
if (status != SC_FS_MEMORY_OK)
return status;
}

sc_dictionary_fs_memory_status status = _sc_dictionary_fs_memory_save_term_string_offsets(memory);
if (status != SC_FS_MEMORY_OK)
return status;
Expand All @@ -1707,7 +1818,6 @@ sc_dictionary_fs_memory_status sc_dictionary_fs_memory_save(sc_dictionary_fs_mem
return status;

sc_message("\tLast string offset: %" PRIu64, memory->last_string_offset);

sc_fs_memory_info("All sc-fs-memory dictionaries saved");
return status;
}
Expand Down
Loading