Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions orion/build_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ def parse_graph_spec(self, graph_spec_yaml):
graph_wide_strict_norm = graph_yaml.get('strict_normalization', None)
add_edge_id = graph_yaml.get('add_edge_id', None)
edge_id_type = graph_yaml.get('edge_id_type', None)
overwrite_edge_ids = graph_yaml.get('overwrite_edge_ids', True)
edge_merging_attributes = graph_yaml.get('edge_merging_attributes', None)
if graph_wide_conflation is not None and type(graph_wide_conflation) != bool:
raise GraphSpecError(f'Invalid type (conflation: {graph_wide_conflation}), must be true or false.')
Expand All @@ -578,6 +579,8 @@ def parse_graph_spec(self, graph_spec_yaml):
raise GraphSpecError(f'Invalid type (add_edge_id: {add_edge_id}), must be true or false.')
if edge_id_type is not None and edge_id_type not in ('orion', 'uuid'):
raise GraphSpecError(f'Invalid edge_id_type: {edge_id_type}, must be "orion" or "uuid".')
if type(overwrite_edge_ids) != bool:
raise GraphSpecError(f'Invalid type (overwrite_edge_ids: {overwrite_edge_ids}), must be true or false.')
if edge_id_type is not None and add_edge_id is None or add_edge_id is False:
add_edge_id = True
if graph_wide_node_norm_version == 'latest':
Expand Down Expand Up @@ -608,6 +611,7 @@ def parse_graph_spec(self, graph_spec_yaml):
graph_output_format=graph_output_format,
add_edge_id=add_edge_id,
edge_id_type=edge_id_type,
overwrite_edge_ids=overwrite_edge_ids,
edge_merging_attributes=edge_merging_attributes,
subgraphs=subgraph_sources,
sources=data_sources)
Expand Down
19 changes: 17 additions & 2 deletions orion/kgx_file_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ def merge(self):
self.merge_metadata['final_edge_count'] += merged_edges_written + unmerged_edges_written
self.merge_metadata['merged_nodes'] += self.node_graph_merger.merged_node_counter
self.merge_metadata['merged_edges'] += self.edge_graph_merger.merged_edge_counter
for merger in (self.node_graph_merger, self.edge_graph_merger):
for warning_type in ('mismatched_properties', 'dropped_properties'):
existing = set(self.merge_metadata['merge_warnings'][warning_type])
existing.update(merger.merge_warnings[warning_type])
self.merge_metadata['merge_warnings'][warning_type] = sorted(existing)

def merge_primary_sources(self,
graph_sources: list):
Expand Down Expand Up @@ -215,23 +220,33 @@ def init_edge_graph_merger(self, save_memory: bool = False) -> GraphMerger:
needs_on_disk_merge = True
break

pre_merge_mapping_file_path = None
if self.graph_spec.add_edge_id and not self.graph_spec.overwrite_edge_ids and self.output_directory:
pre_merge_mapping_file_path = os.path.join(self.output_directory, 'pre_merge_edge_id_mapping.jsonl')

if needs_on_disk_merge:
if self.output_directory is None:
raise IOError(f'DiskGraphMerger attempted but no output directory was specified.')
return DiskGraphMerger(temp_directory=self.output_directory,
edge_merging_attributes=self.graph_spec.edge_merging_attributes,
add_edge_id=self.graph_spec.add_edge_id,
edge_id_type=self.graph_spec.edge_id_type)
edge_id_type=self.graph_spec.edge_id_type,
overwrite_edge_ids=self.graph_spec.overwrite_edge_ids,
pre_merge_mapping_file_path=pre_merge_mapping_file_path)
else:
return MemoryGraphMerger(edge_merging_attributes=self.graph_spec.edge_merging_attributes,
add_edge_id=self.graph_spec.add_edge_id,
edge_id_type=self.graph_spec.edge_id_type)
edge_id_type=self.graph_spec.edge_id_type,
overwrite_edge_ids=self.graph_spec.overwrite_edge_ids,
pre_merge_mapping_file_path=pre_merge_mapping_file_path)

@staticmethod
def init_merge_metadata():
return {'sources': {},
'merged_nodes': 0,
'merged_edges': 0,
'merge_warnings': {'mismatched_properties': [],
'dropped_properties': []},
'final_node_count': 0,
'final_edge_count': 0}

Expand Down
54 changes: 49 additions & 5 deletions orion/kgx_file_normalizer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import os
import json
import jsonlines
import logging
from orion.biolink_constants import (SEQUENCE_VARIANT, RETRIEVAL_SOURCES, PRIMARY_KNOWLEDGE_SOURCE,
AGGREGATOR_KNOWLEDGE_SOURCES, PUBLICATIONS, OBJECT_ID, SUBJECT_ID, PREDICATE,
SUBCLASS_OF, ORIGINAL_OBJECT, ORIGINAL_SUBJECT)
from collections import defaultdict

from orion.biolink_constants import (SEQUENCE_VARIANT, RETRIEVAL_SOURCES, PRIMARY_KNOWLEDGE_SOURCE, OBJECT_ID,
SUBJECT_ID, PREDICATE, SUBCLASS_OF, ORIGINAL_OBJECT, ORIGINAL_SUBJECT)
from orion.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult, \
NormalizationFailedError
from orion.utils import chunk_iterator
from orion.logging import get_orion_logger
from orion.kgx_file_writer import KGXFileWriter


EDGE_PROPERTIES_THAT_SHOULD_BE_SETS = {AGGREGATOR_KNOWLEDGE_SOURCES, PUBLICATIONS}
NODE_NORMALIZATION_BATCH_SIZE = 1_000_000
EDGE_NORMALIZATION_BATCH_SIZE = 1_000_000

Expand Down Expand Up @@ -198,11 +197,17 @@ def normalize_node_file(self):
for failed_node_id, error_message in variant_node_norm_failures.items():
failed_norm_file.write(f'{failed_node_id}\t{error_message}\n')

# compute per-prefix normalization stats
prefix_stats = self.compute_by_prefix_stats(
node_norm_lookup=self.node_normalizer.node_normalization_lookup
)

# update the metadata
self.normalization_metadata.update({
'node_count_pre_normalization': regular_nodes_pre_norm,
'node_count_post_normalization': regular_nodes_post_norm,
'node_normalization_failures': len(regular_node_norm_failures),
'normalization_by_prefix': prefix_stats,
})
if self.has_sequence_variants:
self.normalization_metadata.update({
Expand Down Expand Up @@ -278,6 +283,7 @@ def normalize_edge_file(self):
else:
normalized_predicate = edge[PREDICATE]
edge_inverted_by_normalization = False
normalized_edge_properties = None

# a counter for the number of normalized edges coming from a single source edge
# it's only used to determine how many edge splits occurred
Expand Down Expand Up @@ -357,6 +363,44 @@ def normalize_edge_file(self):
'final_normalized_edges': normalized_edge_count
})

@staticmethod
def compute_by_prefix_stats(node_norm_lookup: dict):
# count totals, failures, and post-normalization prefixes per original prefix
prefix_total = defaultdict(int)
prefix_failed = defaultdict(int)
prefix_normalized_to = defaultdict(lambda: defaultdict(int))
for node_id, normalized_ids in node_norm_lookup.items():
original_prefix = node_id.split(':')[0]
prefix_total[original_prefix] += 1
if normalized_ids:
for normalized_id in normalized_ids:
prefix_normalized_to[original_prefix][normalized_id.split(':')[0]] += 1
else:
prefix_failed[original_prefix] += 1

prefix_stats = {}
# sort the prefixes by their total count, the most common prefix is at the top of the list
for prefix in sorted(prefix_total, key=prefix_total.get, reverse=True):
failed = prefix_failed.get(prefix, 0)
total = prefix_total[prefix]
succeeded = total - failed
percentage = int(succeeded / total * 10000) / 100 # truncate percentage to two decimal places
prefix_stats[prefix] = {
'succeeded': succeeded,
'failed': failed,
'total': total,
'success_rate': percentage,
# sort normalized_to by curie counts, not alphabetically
'normalized_to': dict(sorted(prefix_normalized_to.get(prefix, {}).items(),
key=lambda x: x[1], reverse=True)),
}
if succeeded == 0:
logger.error(f'!!! --- ATTENTION: Curie prefix "{prefix}" failed normalization for all nodes! --- !!!')
elif percentage < 50:
logger.warning(f'WARNING: Curie prefix "{prefix}" only normalized {percentage}% of nodes!!!')
return prefix_stats


def invert_edge(edge):
inverted_edge = {}
for key, value in edge.items():
Expand Down
2 changes: 2 additions & 0 deletions orion/kgxmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class GraphSpec:
graph_version: str
graph_output_format: str
add_edge_id: bool = None
overwrite_edge_ids: bool = True
edge_id_type: str = None
edge_merging_attributes: list = None
sources: list = None
Expand All @@ -58,6 +59,7 @@ def get_metadata_representation(self):
'graph_version': self.graph_version,
'edge_merging_attributes': self.edge_merging_attributes,
'add_edge_id': self.add_edge_id,
'overwrite_edge_ids': self.overwrite_edge_ids,
'edge_id_type': self.edge_id_type,
'subgraphs': [subgraph.get_metadata_representation() for subgraph in self.subgraphs] if self.subgraphs else [],
'sources': [source.get_metadata_representation() for source in self.sources] if self.sources else []
Expand Down
Loading
Loading