Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 124 additions & 6 deletions src/db_loader.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ extern struct HandleList EMPTY_HANDLE_LIST;
// MongoDB reusable globals
static mongoc_database_t *MONGODB = NULL;
static mongoc_client_t *MONGODB_CLIENT = NULL;
static mongoc_collection_t *MONGODB_TYPES= NULL;
static mongoc_collection_t *MONGODB_ATOMS= NULL;
static mongoc_collection_t *MONGODB_TYPES = NULL;
static mongoc_collection_t *MONGODB_NODES = NULL;
static mongoc_collection_t *MONGODB_LINKS = NULL;
static mongoc_collection_t *MONGODB_CONFIG = NULL;
static bson_t MONGODB_REPLY = BSON_INITIALIZER;
static bson_error_t MONGODB_ERROR = {0};
static bson_t *MONGODB_REPLACE_OPTIONS = NULL;
Expand Down Expand Up @@ -120,12 +122,19 @@ static char *COMPOSITE_TYPE_TYPEDEF_HASH = NULL;
S != ARROW_HASH && \
S != COMPOSITE_TYPE_TYPEDEF_HASH) free(S);

// Pattern configuration
static unsigned int *PATTERNS_POSITIONS_TO_IGNORE = NULL;
static unsigned int PATTERNS_POSITIONS_TO_IGNORE_COUNT = 0;
static bool PATTERNS_TO_IGNORE_LOADED = false;
static char PATTERNS_TO_IGNORE_ID[] = "1";

static void mongodb_destroy() {
mongoc_database_destroy(MONGODB);
mongoc_client_destroy(MONGODB_CLIENT);
mongoc_collection_destroy(MONGODB_TYPES);
mongoc_collection_destroy(MONGODB_ATOMS);
mongoc_collection_destroy(MONGODB_NODES);
mongoc_collection_destroy(MONGODB_LINKS);
mongoc_collection_destroy(MONGODB_CONFIG);
bson_destroy(&MONGODB_REPLY);
bson_destroy(MONGODB_REPLACE_OPTIONS);
bson_destroy(MONGODB_INSERT_MANY_OPTIONS);
Expand Down Expand Up @@ -164,7 +173,9 @@ static void mongodb_setup() {
mongodb_error("Failed to create a MongoDB client.");
}
MONGODB_TYPES = mongoc_client_get_collection(MONGODB_CLIENT, "das", "atom_types");
MONGODB_ATOMS = mongoc_client_get_collection(MONGODB_CLIENT, "das", "atoms");
MONGODB_NODES = mongoc_client_get_collection(MONGODB_CLIENT, "das", "nodes");
MONGODB_LINKS = mongoc_client_get_collection(MONGODB_CLIENT, "das", "links");
MONGODB_CONFIG = mongoc_client_get_collection(MONGODB_CLIENT, "das", "config");

MONGODB_REPLACE_OPTIONS = bson_new();
BSON_APPEND_BOOL(MONGODB_REPLACE_OPTIONS, "upsert", true);
Expand Down Expand Up @@ -357,7 +368,7 @@ static void flush_symbol_buffer() {
SYMBOL_BUFFER[i].value_as_float);
}
bool mongo_ok = mongoc_collection_insert_many(
MONGODB_ATOMS,
MONGODB_NODES,
(const bson_t **) bulk_insertion_buffer,
new_size,
MONGODB_INSERT_MANY_OPTIONS,
Expand All @@ -376,7 +387,108 @@ static void flush_symbol_buffer() {
#endif
}

static void load_patterns_to_ignore() {
if (PATTERNS_TO_IGNORE_LOADED) {
return; // Already loaded or failed to load
}

bson_t *query = bson_new();
BSON_APPEND_UTF8(query, "_id", PATTERNS_TO_IGNORE_ID);

mongoc_cursor_t *cursor = mongoc_collection_find_with_opts(MONGODB_CONFIG, query, NULL, NULL);
if (!cursor) {
if (DEBUG) fprintf(stdout, "Failed to create cursor for pattern configuration\n");
bson_destroy(query);
// Failed to load, set this to true to avoid querying the database again
PATTERNS_TO_IGNORE_LOADED = true;
return;
}

const bson_t *doc;
if (mongoc_cursor_next(cursor, &doc)) {
bson_iter_t iter;
if (bson_iter_init_find(&iter, doc, "positions_to_ignore") && BSON_ITER_HOLDS_ARRAY(&iter)) {
bson_iter_t array_iter;
if (bson_iter_recurse(&iter, &array_iter)) {
// Count the number of allowed positions
unsigned int count = 0;
bson_iter_t count_iter;
bson_iter_recurse(&iter, &count_iter);
while (bson_iter_next(&count_iter)) {
count++;
}

// Allocate memory for the allowed positions
PATTERNS_POSITIONS_TO_IGNORE = (unsigned int *) malloc(count * sizeof(unsigned int));
PATTERNS_POSITIONS_TO_IGNORE_COUNT = count;

// Fill the array with allowed positions
unsigned int index = 0;
bson_iter_recurse(&iter, &array_iter);
while (bson_iter_next(&array_iter) && index < count) {
if (BSON_ITER_HOLDS_INT32(&array_iter)) {
PATTERNS_POSITIONS_TO_IGNORE[index] = (unsigned int) bson_iter_int32(&array_iter);
index++;
}
}

if (DEBUG) {
fprintf(stdout, "Loaded %d positions_to_ignore from configs:\n", count);
for (unsigned int i = 0; i < count; i++) {
fprintf(stdout, "%u, ", PATTERNS_POSITIONS_TO_IGNORE[i]);
}
fprintf(stdout, "\n");
}
}
} else {
if (DEBUG) fprintf(stdout, "No positions array found in pattern configuration\n");
}
} else {
if (DEBUG) fprintf(stdout, "No pattern configuration found, using default behavior\n");
}

mongoc_cursor_destroy(cursor);
bson_destroy(query);

PATTERNS_TO_IGNORE_LOADED = true;
}

// Patterns to ignore document example (must be stored in "configs" collection):
// {
// "_id": "1",
// "positions_to_ignore": [0, 2]
// }
static bool patterns_keys_to_ignore(char **composite_key) {
// Load patterns to ignore if not already loaded
if (!PATTERNS_TO_IGNORE_LOADED) {
load_patterns_to_ignore();
}

// If no configuration or no allowed positions, store all patterns
if (!PATTERNS_TO_IGNORE_LOADED || PATTERNS_POSITIONS_TO_IGNORE_COUNT == 0) {
return false;
}

// Check each position in the composite key
for (unsigned int i = 0; i < MAX_INDEXABLE_ARITY; i++) {
if (composite_key[i] == NULL) {
break;
}
if (strcmp(composite_key[i], "*") == 0) {
// Check if position i is in the allowed positions list
for (unsigned int j = 0; j < PATTERNS_POSITIONS_TO_IGNORE_COUNT; j++) {
if (PATTERNS_POSITIONS_TO_IGNORE[j] == i) {
return true;
}
}
}
}

return false;
}

static void add_redis_pattern(char **composite_key, unsigned int arity, char *value) {
if (patterns_keys_to_ignore(composite_key)) { return; }
char *key = expression_hash(EXPRESSION_HASH, composite_key, arity);
REDIS_APPEND_COMMAND_MACRO(REDIS, "ZADD %s:%s %ld %s", PATTERNS, key, PATTERNS_SCORE, value);
PATTERNS_SCORE++;
Expand Down Expand Up @@ -644,7 +756,7 @@ static void flush_expression_buffer() {
}

bool mongodb_ok = mongoc_collection_insert_many(
MONGODB_ATOMS,
MONGODB_LINKS,
(const bson_t **) bulk_insertion_buffer,
new_size,
MONGODB_INSERT_MANY_OPTIONS,
Expand Down Expand Up @@ -773,6 +885,7 @@ void initialize_actions() {
COMPOSITE_TYPE_TYPEDEF_HASH = composite_hash((char **) composite_type_typedef, 3);
redis_setup();
mongodb_setup();
load_patterns_to_ignore();
insert_commom_atoms();
#ifndef SUPPRESS_PROGRESS_BAR
print_progress_bar(yylineno, INPUT_LINE_COUNT, 1, 1, false);
Expand All @@ -789,6 +902,11 @@ void finalize_actions() {
if (PENDING_REDIS_COMMANDS > 0) {
flush_redis_commands();
}

if (PATTERNS_POSITIONS_TO_IGNORE != NULL) {
free(PATTERNS_POSITIONS_TO_IGNORE);
}

REDIS_FREE_MACRO(REDIS);
mongodb_destroy();

Expand Down