diff --git a/.github/workflows/codespell.yml b/.github/workflows/codespell.yml new file mode 100644 index 0000000..ba7ed47 --- /dev/null +++ b/.github/workflows/codespell.yml @@ -0,0 +1,44 @@ +# Codespell configuration is within pyproject.toml +--- +name: Codespell + +on: [push] +#on: +# push: +# branches: [main] +# pull_request: +# branches: [main] + +permissions: + contents: read + +jobs: + codespell: + name: Check for spelling errors + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Codespell + uses: codespell-project/actions-codespell@v2 + with: + ignore_words_list: Crate,crate,fo,tre + + formatting: + permissions: + contents: read # for actions/checkout to fetch code + pull-requests: write # for marocchino/sticky-pull-request-comment to create or update PR comment + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: mamba-org/setup-micromamba@v1 + with: + environment-name: black + create-args: -c bioconda -c conda-forge black + cache-environment: false + + - name: Check formatting + shell: bash -el {0} + run: black --check --diff . diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 0000000..9ca7810 --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,32 @@ +name: pytest + +on: [push] + +jobs: + testing: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.11", "3.13"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install lib from source + shell: bash -el {0} + run: | + python -m pip install --upgrade pip pytest pandas + python -m pip install . + + - name: Test local + env: + CI: true + #PYTHONTRACEMALLOC: 10 + shell: bash -el {0} + run: | + python -m pytest -v --show-capture=stderr \ + tests.py diff --git a/README.rst b/README.rst index 6e55369..5f96ba4 100644 --- a/README.rst +++ b/README.rst @@ -27,15 +27,15 @@ Records can be accessed through this list or by id. ``GenomeDiff`` is iterable a :: >>> from genomediff import * - >>> document = GenomeDiff.read(open('MyDiff.gd', 'r', encoding='utf-8')) + >>> document = GenomeDiff.read("MyDiff.gd") >>> document.metadata - {'GENOME_DIFF': '1.0', 'AUTHOR': ''} + {'GENOME_DIFF': ['1.0'], 'AUTHOR': ['']} >>> document.mutations[0] Record('SNP', 1, [191], new_seq='A', seq_id='NC_000913', snp_type='intergenic', position=12346) >>> document.mutations[0].parent_ids [191] >>> document[191] Record('RA', 191, None, tot_cov='46/42', new_base='A', insert_position=0, ref_base='G', seq_id='NC_000913', quality=252.9, position=12345) - >>> document.mutations[0].parents + >>> document.records.parents_of[0] [Record('RA', 191, None, tot_cov='46/42', new_base='A', insert_position=0, ref_base='G', seq_id='NC_000913', quality=252.9, position=12345)] >>> document.write(open('NewDiff.gd', 'w', encoding='utf-8')) diff --git a/src/genomediff/__init__.py b/src/genomediff/__init__.py index 366f4ee..d7902db 100644 --- a/src/genomediff/__init__.py +++ b/src/genomediff/__init__.py @@ -1,73 +1,78 @@ -import itertools -from genomediff.parser import GenomeDiffParser -from genomediff.records import Metadata +# -*- coding: utf-8 -*- +from pathlib import Path +from typing import TextIO +from .parser import Metadata, MetadataContainer, load_json_ref_cov, parse +from .records import RecordCollection -class GenomeDiff(object): - def __init__(self): - self.metadata = {} - self.mutations = [] - self.evidence = [] - self.validation = [] - self._index = {} +class GenomeDiff: + def __init__( + self, + metadata: MetadataContainer, + records: RecordCollection, + comments: "dict[int, str]|None" = None, + ): + self._metadata = metadata + self._records = records + self._comments = comments if comments else {} - @classmethod - def read(cls, fsock): - gd = GenomeDiff() + @property + def metadata(self): + return self._metadata + + @property + def records(self): + return self._records - for record in GenomeDiffParser(document=gd, fsock=fsock): + @classmethod + def read(cls, gdfile: "str|Path|TextIO"): + records = RecordCollection.new() + comments: dict[int, str] = {} + if hasattr(gdfile, "read"): + metadata = MetadataContainer(getattr(gdfile, "name", "")) + fsock: TextIO = gdfile # type: ignore[assignment] + else: + metadata = MetadataContainer(gdfile) + fsock = open(gdfile, "r") + self = cls(metadata, records, comments) + for i, record in parse(fsock): if isinstance(record, Metadata): - gd.metadata[record.name] = record.value + metadata.set(record.name, record.value) + elif isinstance(record, str): + comments[i] = record else: - if len(record.type) == 3: - gd.mutations.append(record) - if len(record.type) == 2: - gd.evidence.append(record) - if len(record.type) == 4: - gd.validation.append(record) - gd._index[record.id] = record - return gd - - def __getitem__(self, item): - return self._index[item] + record.document = self + records.set(record) + if not hasattr(gdfile, "read"): + fsock.close() + return self def write(self, fsock): - #Print our own version line first and ignore any in the metadata - print("#=GENOME_DIFF\t1.0", file = fsock) - for k, v in self.metadata.items(): - if k != "GENOME_DIFF": - print("#={}\t{}".format(k, v), file = fsock) - for record in itertools.chain(self.mutations, self.evidence, self.validation): - print(str(record), file = fsock) + print("#=GENOME_DIFF\t1.0", file=fsock) + for l in self.metadata.lines: + if l.name != "GENOME_DIFF": + print(l, file=fsock) + for record in self.records: + print(str(record), file=fsock) - def __len__(self): - return len(self.mutations) + len(self.evidence) + len(self.validation) + @property + def cov_summary(self): + if outdir := self.metadata.output: + return load_json_ref_cov(outdir) + raise AttributeError("No output directory found in metadata") - def __iter__(self): - return itertools.chain(self.mutations, self.evidence, self.validation) + @property + def mutations(self): + return list(self.records.mutation) - def __str__(self): - return '\n'.join(["MUTATIONS:",'\n'.join([str(x) for x in self.mutations]), - "EVIDENCE:",'\n'.join([str(x) for x in self.evidence]), - "VALIDATION:",'\n'.join(self.validation)]) + @property + def evidence(self): + return list(self.records.evidence) + @property + def validation(self): + return list(self.records.validation) - def remove(self,*args, mut_type=None): - ''' - Remove mutations that satisfy the given conditions. Implementation of - gdtools REMOVE for genomediff objects. - - Input: a variable number of conditions, e.g. 'gene_name==rrlA','frequency>=0.9'. - If mut_type is specified, only that mutation type will be removed. - Output: self.mutations is updated, with mutations satifying the conditions - having been removed. - ''' - updated_mutations = [] - for rec in self.mutations: - if (mut_type is None or mut_type == rec.type) and rec.satisfies(*args): - continue - else: - updated_mutations.append(rec) - - self.mutations = updated_mutations + def __getitem__(self, key): + return self.records[key] diff --git a/src/genomediff/parser.py b/src/genomediff/parser.py index 538d204..b8694b3 100644 --- a/src/genomediff/parser.py +++ b/src/genomediff/parser.py @@ -1,59 +1,220 @@ -from collections import OrderedDict +import json import re -from genomediff.records import TYPE_SPECIFIC_FIELDS, Metadata, Record - -class GenomeDiffParser(object): - def __init__(self, fsock=None, document=None): - self._document = document - self._fsock = fsock - - @staticmethod - def _convert_value(value): - for type_ in (int, float): - try: - return type_(value) - except ValueError: - pass - - if value == '.' or value == '': - value = None - return value - - def __iter__(self): - metadata_pattern = re.compile(r'^\#\=(\w+)\s+(.*)$') - mutation_pattern = re.compile(r'^(?P[A-Z]{2,4})' - '\t(?P\\d+)' - '\t((?P\\d+(,\\s*\\d+)*)|\\.?)' - '\t(?P.+)?$') - - for i, line in enumerate(self._fsock): - if not line: - continue - elif line.startswith('#'): - match = metadata_pattern.match(line) - if match: - yield Metadata(*match.group(1, 2)) +from collections import OrderedDict +from pathlib import Path +from time import strptime +from typing import Final, Iterable, NamedTuple +from warnings import warn + +from .records import RecordEnum + +METADATA_PATTERN: Final = re.compile(r"^\#\=([^\s]+)\s+(.*)$") +MUTATION_PATTERN: Final = re.compile( + r"^(?P[A-Z]{2,4})" + r"\t(?P\d+|\.)" + r"\t((?P\d+(,\s*\d+)*)|\.?)" + r"\t(?P.+)?$" +) + + +class Metadata(NamedTuple): + name: str + value: str + + def __repr__(self): + return f"#={repr(self.name)} {repr(self.value)}" + + +class MetadataContainer: + """https://github.com/barricklab/breseq/wiki/GenomeDiff-File-Format#metadata-lines + + Lines beginning with **#= ** are interpreted as metadata. (Thus, the first line is assigning a metadata item named GENOME_DIFF a value of 1.0.) Names cannot include whitespace characters. Values may include whitespace characters. In most cases, the values for lines with the same name are concatenated with single spaces added between them or interpreted as a list. + + Some of these metadata fields are used to name and sort samples by |gdtools| COMPARE, to find relevant files by |gdtools| RUNFILE, and by other utilities. + """ + + def __init__(self, file_path: "str|Path"): + self._dict: OrderedDict[str, list[str]] = OrderedDict() + self.file_path = Path(file_path) + + @property + def lines(self): + for k, vs in self._dict.items(): + for v in vs: + yield Metadata(k, v) + + @property + def GENOME_DIFF(self): + """The first line of the file must define that this is a |GD| file and the version of the file specification used:: + + #=GENOME_DIFF 1.0 + """ + (version,) = self._dict["GENOME_DIFF"] + return version + + @property + def gd_version(self): + return self.GENOME_DIFF + + def check_multi_values(self, key: str): + if (values := self._dict.get(key)) is None: + return None + if len(values) > 1: + warn(f"Unexpected multiple values for {key}: {values}") + return values[-1] + + # Common but optional metadata fields include: + @property + def TITLE(self): + """The name of the sample. + If this field is not provided, the name of the |GD| file (removing the .gd suffix) is used for this field. + """ + if "TITLE" not in self._dict: + return self.file_path.with_suffix("").name + return self.check_multi_values("TITLE") + + @property + def AUTHOR(self): + """Name of person who curated the |GD| file.""" + return self.check_multi_values("AUTHOR") + + @property + def PROGRAM(self): + """Name and version of software program that generated the |GD| file.""" + return self.check_multi_values("PROGRAM") + + @property + def CREATED(self): + """Date on which the |GD| file was created.""" + t_ = self.check_multi_values("CREATED") + if t_ is None: + return None + return strptime(t_, "%H:%M:%S %d %b %Y") + + @property + def TIME(self): + """Time point the sample is from, in days, generations, or any other unit of measurement. Ex: 1, 2, 15000""" + return self.check_multi_values("TIME") + + @property + def POPULATION(self): + """Name/designation for the population the sample is from. Ex: Ara–3 / MA-1""" + return self.check_multi_values("POPULATION") + + @property + def TREATMENT(self): + """Experimental treatment group for this population. Ex: LB medium / LTEE""" + return self.check_multi_values("TREATMENT") + + @property + def CLONE(self): + """Name/designation for a clonal isolate, Ex: A, B, REL10863""" + return self.check_multi_values("CLONE") + + @property + def REFSEQ(self): + """Location of the reference sequence file. Ex: /here/is/an/absolute/path/to/the/file.gb""" + return [Path(i) for i in self._dict.get("REFSEQ", [])] + + @property + def ADAPTSEQ(self): + """Location of the adaptor sequence file. Ex: relative/path/to/the/adaptors.fa""" + return [Path(i) for i in self._dict.get("ADAPTSEQ", [])] + + @property + def READSEQ(self): + """Location of the read sequence file. Ex: https://place.org/url/for/file/download.fastq""" + return [Path(i) for i in self._dict.get("READSEQ", [])] + + @property + def COMMAND(self): + """Command line used to generate the |GD| file.""" + return self.check_multi_values("COMMAND") + + @property + def output(self): + if cmd := self.COMMAND: + cmds = cmd.split() + index = -1 + if (index := cmds.index("-o")) != -1: + return Path(cmds[index + 1]) + if (index := cmds.index("--output")) != -1: + return Path(cmds[index + 1]) + + def set(self, name: str, value: str): + self._dict.setdefault(name, []).append(value) + + @classmethod + def from_dict(cls, d: dict[str, "str|list"]): + self = cls("") + for k, v in d.items(): + if isinstance(v, list): + for i in v: + self.set(k, str(i)) else: - match = mutation_pattern.match(line) + self.set(k, str(v)) + return self + + def __eq__(self, value): + if isinstance(value, MetadataContainer): + return dict(self._dict) == dict(value._dict) + if isinstance(value, dict): + return self == self.from_dict(value) + + +def read_line(line: str): + if line.startswith("#"): + match = METADATA_PATTERN.match(line) + if match: + return Metadata(*match.group(1, 2)) + """ + Lines beginning with whitespace and # are comments. Comments may not occur at the end of a data line. + """ + return line + else: + match = MUTATION_PATTERN.match(line) + if match: + record_type = match.group("type") + assert record_type in RecordEnum._member_names_ + record = RecordEnum[record_type].value( + match.group("id"), + match.group("parent_ids"), + match.group("extra"), + ) + return record + return + - if match: - type = match.group('type') - id = int(match.group('id')) +def parse(fsock: Iterable[str]): + for line_ptr, line in enumerate(fsock): + if not line: + continue + record = read_line(line) + if record: + yield line_ptr, record + else: + raise Exception(f"Could not parse line #{line_ptr}: {line}") - parent_ids = match.group('parent_ids') - if parent_ids: - parent_ids = [int(id) for id in parent_ids.split(',')] - extra = match.group('extra').split('\t') - extra_dct = OrderedDict() +def GenomeDiffParser(fsock: Iterable[str]): + return (j for i, j in parse(fsock=fsock)) - for name in TYPE_SPECIFIC_FIELDS[type]: - value = extra.pop(0) - extra_dct[name] = self._convert_value(value) - for k, v in (e.split('=', 1) for e in extra): - extra_dct[k] = self._convert_value(v) +def load_json_ref_cov( + breseq_output_dir: "str|Path", relative_file: str = "output/summary.json" +): + import pandas as pd - yield Record(type, id, self._document, parent_ids, **extra_dct) - else: - raise Exception('Could not parse line #{}: {}'.format(i, line)) + breseq_json = ( + (Path(breseq_output_dir) / relative_file) + if relative_file + else breseq_output_dir + ) + with open(breseq_json) as ji: + j = json.load(ji) + df = ( + pd.DataFrame(j["references"]["reference"]) + .T.rename_axis(index="reference") + .assign(breseq_output_dir=breseq_json) + ) + return df diff --git a/src/genomediff/records.py b/src/genomediff/records.py index 4087207..6dbd24b 100644 --- a/src/genomediff/records.py +++ b/src/genomediff/records.py @@ -1,158 +1,618 @@ import re +from collections import OrderedDict +from enum import Enum +from typing import Final, Iterable, Literal, NamedTuple, TYPE_CHECKING -######### -## Fields should be kept synced with genomediff.cpp in the breseq source code - -TYPE_SPECIFIC_FIELDS = { - 'SNP': ('seq_id', 'position', 'new_seq'), - 'SUB': ('seq_id', 'position', 'size', 'new_seq'), - 'DEL': ('seq_id', 'position', 'size'), - 'INS': ('seq_id', 'position', 'new_seq'), - 'MOB': ('seq_id', 'position', 'repeat_name', 'strand', 'duplication_size'), - 'AMP': ('seq_id', 'position', 'size', 'new_copy_number'), - 'CON': ('seq_id', 'position', 'size', 'region'), - 'INV': ('seq_id', 'position', 'size'), - 'RA': ('seq_id', 'position', 'insert_position', 'ref_base', 'new_base'), - 'MC': ('seq_id', 'start', 'end', 'start_range', 'end_range'), - 'JC': ('side_1_seq_id', - 'side_1_position', - 'side_1_strand', - 'side_2_seq_id', - 'side_2_position', - 'side_2_strand', - 'overlap'), - 'CN': ('seq_id', 'start', 'end', 'copy_number'), - 'UN': ('seq_id', 'start', 'end'), - 'CURA': ('expert',), - 'FPOS': ('expert',), - 'PHYL': ('gd',), - 'TSEQ': ('seq_id', 'primer1_start', 'primer1_end', 'primer2_start', 'primer2_end'), - 'PFLP': ('seq_id', 'primer1_start', 'primer1_end', 'primer2_start', 'primer2_end'), - 'RFLP': ('seq_id', 'primer1_start', 'primer1_end', 'primer2_start', 'primer2_end', 'enzyme'), - 'PFGE': ('seq_id', 'restriction_enzyme'), - 'NOTE': ('note',), -} - -######### - -class Metadata(object): - def __init__(self, name, value): - self.name = name - self.value = value +STRAND_MAPPING_PATTERN: Final = re.compile(r"^(\d+)/(\d+)$") - def __repr__(self): - return "Metadata({}, {})".format(repr(self.name), repr(self.value)) - def __str__(self, other): - return "#={}\t{}".format(repr(self.name), repr(self.value)) +class ReadEvidenceStrand(NamedTuple): + pos: int = 0 + neg: int = 0 - def __eq__(self, other): - return self.__dict__ == other.__dict__ + def __str__(self): + return f"{self.pos}/{self.neg}" + + +def _convert_value(value: str): + if value == "." or value == "" or value is None: + return None + for type_ in (int, float): + try: + return type_(value) + except ValueError: + pass + if (match := STRAND_MAPPING_PATTERN.match(value)) is not None: + return ReadEvidenceStrand(int(match.group(1)), int(match.group(2))) + return value + + +class Condition: + class Comp(Enum): + # fmt: off + EQ = "=="; NE = "!="; LT = "<"; LE = "<="; GT = ">"; GE = ">=" + # fmt: on + + def __repr__(self): + return f"__{self.name.lower()}__" + + @classmethod + def PATTERN(cls): + return "|".join((i.value for i in cls.__members__.values())) + + def __init__(self, comp: Comp, val, default_key=""): + self.key = default_key + self.comp = comp + self.val = val + + def __str__(self): + return f"{self.key}{self.comp.value}{self.val}" + + CONDITION_PATTERN = re.compile( + r"^(?P([_a-z]+)?)\s*(?P" + + Comp.PATTERN() + + r")\s*(?P[-_a-zA-Z0-9\.]+)" + ) + + @classmethod + def parse(cls, condition: str): + condition_match = cls.CONDITION_PATTERN.match(condition) + if not condition_match: + cond_key = "" + cond_comp = "==" + cond_val = condition + else: + cond_key = condition_match.group("key") + cond_comp = condition_match.group("comp") + cond_val = condition_match.group("val") + return cls(cls.Comp(cond_comp), _convert_value(cond_val), cond_key) + + def __call__(self, val) -> bool: + return getattr(val, repr(self.comp))(self.val) + + def check_attr(self, val) -> bool: + return self(getattr(val, self.key)) + + +class RecordMeta(type): + types: dict[str, str] = {} + + def __init__(cls, name, bases, attrs): + super().__init__(name, bases, attrs) + if attrs.get("DataItem", NotImplemented) != NotImplemented: + cls.types[name] = bases[0].type_class + + +class _Record(metaclass=RecordMeta): + type_class: str = NotImplemented + + def __init__(self, evidence_id: str, parent_ids: str, extra: str, document=None): + """Data lines describe either a mutation or evidence from an analysis that can potentially support a mutational event. Data fields are tab-delimited. Each line begins with several fields containing information common to all types, continues with a fixed number of type-specific fields, and ends with an arbitrary number of name=value pairs that store optional information. + 1. **type** ** + type of the entry on this line. -class Record(object): - def __init__(self, type, id, document=None, parent_ids=None, **attributes): + 2. **id or evidence-id** ** + + For evidence and validation lines, the id of this item. For mutation lines, the ids of all evidence or validation items that support this mutation. May be set to '.' if a line was manually edited. + + 3. **parent-ids** ** + + ids of evidence that support this mutation. May be set to '.' or left blank. + """ + self.evidence_id = _convert_value(evidence_id) + self.parent_ids = ( + [ + parent_id + for parent_ids in parent_ids.split(",") + if (parent_id := _convert_value(parent_ids)) is not None + ] + if _convert_value(parent_ids) is not None + else [] + ) + self.parse_extra(extra) self.document = document - self.type = type - self.id = id - self.parent_ids = parent_ids - self.attributes = attributes + + DataItem = NotImplemented + + def load_data_item(self, extra: str): + _fields = self.DataItem._fields + extra_split = extra.split("\t") + self.dataitem = self.DataItem( # type: ignore[misc] + *(self.DataItem.__annotations__[k](v) for k, v in zip(_fields, extra_split)) + ) + return extra_split[len(_fields) :] + + def parse_extra(self, extra: str): + extra2dict = self.load_data_item(extra) + self.extra = OrderedDict( + (k, _convert_value(v)) + for k, v in (e.split("=", 1) for e in extra2dict if e) + ) + + @property + def type(self): + return self.__class__.__name__ + + @property + def id(self): + """alias""" + return self.evidence_id @property def parents(self): - if not self.parent_ids is None: + from . import GenomeDiff + + if isinstance(self.document, GenomeDiff): return [self.document[pid] for pid in self.parent_ids] - else: - return [] + return [] def __getattr__(self, item): - try: - return self.attributes[item] - except KeyError: - raise AttributeError + if item in self.dataitem._fields: + return self.dataitem.__getattribute__(item) + return self.extra[item] + @property + def attributes(self) -> dict: + dataitem = OrderedDict(zip(self.dataitem._fields, self.dataitem)) + return dataitem | self.extra def __repr__(self): - return "Record('{}', {}, {}, {})".format(self.type, - self.id, - self.parent_ids, - ', '.join('{}={}'.format(k, repr(v)) for k, v in self.attributes.items())) + return "Record('{}', {}, {}, {})".format( + self.type, + self.id, + self.parent_ids, + ", ".join(f"{k}={repr(v)}" for k, v in self.attributes.items()), + ) def __str__(self): + return "\t".join( + [ + self.type, + str(self.id), + " ".join([str(i) for i in self.parent_ids]), + ] + + [str(i) for i in self.dataitem] + + [f"{k}={v}" for k, v in self.extra.items()] + ) - parent_id_str='.' - if not self.parent_ids is None: - parent_id_str = ','.join(str(v) for v in self.parent_ids) - - ## Extract fields that are required (don't have field=value) - remaining_items = self.attributes.copy() - type_fields_key_list = TYPE_SPECIFIC_FIELDS[self.type] - type_fields_list = [] - for type_field_key in sorted(type_fields_key_list): - type_fields_list.append(str(self.attributes[type_field_key])) - del remaining_items[type_field_key] - type_fields_str = '\t'.join(type_fields_list) - - ## Remaining are printed as field=value - attribute_str = '\t'.join('{}={}'.format(k, str(v)) for k, v in remaining_items.items()) - - return '\t'.join([self.type, - str(self.id), - parent_id_str, - type_fields_str, - attribute_str - ]) + def to_dict(self) -> dict: + return {"type": self.type, "id": self.id} | self.attributes def __eq__(self, other): - ''' this definition allows identical mutations in different genome diffs - to be equal.''' - return self.type == other.type and self.attributes == other.attributes + if isinstance(other, self.__class__): + return ( + self.type == other.type + and self.id == other.id + and self.dataitem == other.dataitem + and self.extra == other.extra + ) + return False + + def __le__(self, other): + if isinstance(other, self.__class__): + return ( + self.type == other.type + and self.id == other.id + and self.dataitem == other.dataitem + and self.extra + == {k: v for k, v in other.extra.items() if k in self.to_dict()} + ) + return False - def __ne__(self, other): - return not self.__eq__(other) + def __lt__(self, other): + if isinstance(other, self.__class__): + return self <= other and self != other - def satisfies(self, *args): - ''' + def satisfy(self, *conds: "str|Condition", **kconds: "str|Condition"): + """ Input: a variable number of conditions, e.g. 'gene_name==rrlA','frequency>=0.9'. Output: return true if all conditions are true (i.e. correspond to key-values in attributes. Find a condition that evaluates to false, otherwise return True. - ''' - - ## helper function to check if values are numbers - def is_number(s): + """ + for condi in conds: + cond = condi if isinstance(condi, Condition) else Condition.parse(condi) try: - float(s) - return True - except ValueError: + attribute_val = getattr(self, cond.key) + except AttributeError: + continue + if not cond(attribute_val): return False - - for c in args: - assert type(c) == str, "error: supplied condition is not a string." - condition_pattern = re.compile(r'^(?P[_a-z]+)' - '(?P==|!=|<|<=|>|>=)' - '(?P[-_a-zA-Z0-9\\.]+)') - condition_match = condition_pattern.match(c) - assert condition_match, "the supplied condition\n"+c+"\n could not be parsed." - cond_key = condition_match.group('key') - cond_comp = condition_match.group('comp') - cond_val = condition_match.group('val') - - try: ## in case the given condition is not in the attributes. - attribute_val = self.attributes[cond_key] - except: + for key, condi in kconds.items(): + cond = condi if isinstance(condi, Condition) else Condition.parse(condi) + try: + attribute_val = getattr(self, key) + except AttributeError: continue - - ## add quote marks around strings before eval. can leave numbers alone. - if not is_number(cond_val): - cond_val = "\'"+cond_val+"\'" - - if not is_number(attribute_val): - attribute_val = "\'"+attribute_val+"\'" - else: ## attribute_val is a number in this record-- convert to str for eval. - attribute_val = str(attribute_val) - expr = attribute_val+cond_comp+cond_val - if not eval(expr): + if not cond(attribute_val): return False return True + + +# fmt: off +class RecordMutation (_Record): + type_class = "mutation" + def range(self): + return range(self.dataitem.position, self.dataitem.position + self.dataitem.size) + +class RecordEvidence (_Record): type_class = "evidence" +class RecordValidation(_Record): type_class = "validation" +# fmt: on + +if TYPE_CHECKING: + SeqStrand = Literal["+", "-"] +else: + + class SeqStrand(str): + ARGS = {"1": "+", "-1": "-"} + + def __new__(cls, value): + if value in cls.ARGS.values(): + return super().__new__(cls, value) + value = str(value) + if value in cls.ARGS: + return super().__new__(cls, cls.ARGS[value]) + raise ValueError(f"Invalid strand value: {value}") + + +# fmt: off +class SNP(RecordMutation): + """Base substitution mutation + - seq_id id of reference sequence fragment containing mutation, evidence, or validation. + - position position in reference sequence fragment of base to replace. + - new_seq new base at position. + """ + class DataItem(NamedTuple): seq_id: str; position: int; new_seq: str + dataitem: DataItem + def range(self): return range(self.dataitem.position, self.dataitem.position + 1) + # SNP("1", "1", "1\t1\t1").dataitem.new_seq +class SUB(RecordMutation): + """Multiple base substitution mutation + - seq_id + - position position in reference sequence of the first base that will be replaced. + - size number of bases *after* the specified reference position to replace with **new_seq**. + - new_seq new bases to substitute. + """ + class DataItem(NamedTuple): seq_id: str; position: int; size: int; new_seq: str + dataitem: DataItem +class DEL(RecordMutation): + """Deletion mutation + - seq_id + - position position in reference sequence fragment of first deleted base. + - size number of bases deleted in reference. + + * **mediated=** ** + This deletion appears to be mediated by a molecular event involving a mobile element such as a transposon. A copy of the mobile element is found on the boundary of the deleted region and a new junction at the opposite end of the deletion matches the end of the mobile element. + * **between=** ** + This deletion appears to result from homologous recombination or polymerase slipping between two existing copies of the same genomic repeat (e.g. tRNA, IS element) in the genome. One copy of the repeat is deleted by this event. + * **repeat_seq=** **, **repeat_length=** **, **repeat_ref_num=** **, **repeat_new_copies=** ** + This deletion is in a short sequence repeat consisting of tandem copies of **repeat_seq** repeated **repeat_ref_num** times in the ancestor and **repeat_new_copies** after a mutation. To be annotated in this way the copy of the repeat in the reference genome must consist of at least two repeat copies and have a length of five of more total bases (**repeat_length** × **repeat_ref_num** ≥ 5). + """ + class DataItem(NamedTuple): seq_id: str; position: int; size: int + dataitem: DataItem +class INS(RecordMutation): + """Insertion mutation + - seq_id + - position position in reference sequence fragment. New bases are inserted *after* this position. + - new_seq new bases to be inserted in the reference. + + * **repeat_seq=** **, **repeat_length=** **, **repeat_ref_num=** **, **repeat_new_copies=** ** + This insertion is in a short sequence repeat consisting of tandem copies of **repeat_seq** repeated **repeat_ref_num** times in the ancestor and **repeat_new_copies** after a mutation. To be annotated in this way the copy of the repeat in the reference genome must consist of at least two repeat copies and have a length of five of more total bases (**repeat_length** × **repeat_ref_num** ≥ 5). + * **insert_position=** ** + Used when there are multiple insertion events after the same reference base to order the insertions. This typically happens in polymorphism mode and when manually breaking up an insertion of bases into distinct mutational events when this is supported by phylogenetic information. Numbering of insert positions begins with 1. + """ + class DataItem(NamedTuple): seq_id: str; position: int; new_seq: str + dataitem: DataItem + def range(self): return range(self.dataitem.position, self.dataitem.position + 1) +class MOB(RecordMutation): + """Mobile element insertion mutation + - seq_id + - position position in reference sequence fragment of the first duplicated base at the target site. + - repeat_name name of the mobile element. Should correspond to an annotated **repeat_region** or **mobile_element** feature in the reference sequence. + - strand strand of mobile element insertion. + - duplication_size number of target site bases duplicated during insertion of the mobile element, beginning with the specified reference position. If the value of this field is negative, then it indicates that the absolute value of this number of bases were deleted at the target site beginning with the specified position. If the value of this field is zero, then the there were no duplicated bases, and the mobile element was inserted after the specified base position. + + * **del_start=** **, **del_end=** ** + Delete this many bases from the start or end of the inserted mobile element. This deletion occurs with respect to the top strand of the genome after the element is flipped to the orientation with which it will be inserted. + * **ins_start=** **, **ins_end=** ** + Append the specified bases to the start or end of the inserted mobile element. These insertions occur after any deletions and will be inside of any duplicated target site bases. + * **mob_region** = ** + Use the existing copy of the mobile element specified as a seq_id:start-end region to apply this mutation. Useful when different annotated members of a mobile element family have slightly different sequences. + """ + class DataItem(NamedTuple): seq_id: str; position: int; repeat_name: str; strand: SeqStrand; duplication_size: int + dataitem: DataItem + def range(self): raise NotImplementedError +class AMP(RecordMutation): + """Amplification mutation + - seq_id + - position position in reference sequence fragment. + - size number of bases duplicated starting with the specified reference position. + - new_copy_number new number of copies of specified bases. + + * **between=** ** + This amplification appears to result from homologous recombination or polymerase slipping between two existing copies of the same genomic repeat (e.g. tRNA, IS element) in the genome. This repeat appears on the boundary of each copy of the specified region. + * **mediated=** **, *mediated_strand=** *<1/-1>* + This amplification is mediated by a simultaneous new insertion of a mobile element (or other repeat element). New copies of the inserted element are added in the specified strand orientation between each new copy of the amplified region. Both of these attributes must be specified for the mutation. + * **mob_region** = ** + Only valid for 'mediated' amplifications. Use the existing copy of the mobile element specified as a seq_id:start-end region to apply this mutation. Useful when different annotated members of a mobile element family have slightly different sequences. + """ + class DataItem(NamedTuple): seq_id: str; position: int; size: int; new_copy_number: int + dataitem: DataItem +class CON(RecordMutation): + """Gene conversion mutation + - seq_id + - position position in reference sequence fragment that was the target of gene conversion from another genomic location. + - size number of bases to replace in the reference genome beginning at the specified position. + - region Region in the reference genome to use as a replacement. + """ + class DataItem(NamedTuple): seq_id: str; position: int; size: int; region: str + dataitem: DataItem +class INV(RecordMutation): + """Inversion mutation + - seq_id + - position position in reference sequence fragment. + - size number of bases in inverted region beginning at the specified reference position. + """ + class DataItem(NamedTuple): seq_id: str; position: int; size: int + dataitem: DataItem + + +class RA(RecordEvidence): + """Read alignment evidence + - seq_id + - position position in reference sequence fragment. + - insert_position number of bases inserted after the reference position to get to this base. An value of zero refers to the base. A value of 5 means that this evidence if for the fifth newly inserted column after the reference position. + - ref_base base in the reference genome. + - new_base new base supported by read alignment evidence. + """ + class DataItem(NamedTuple): seq_id: str; position: int; insert_position: int; ref_base: str; new_base: str + dataitem: DataItem +class MC(RecordEvidence): + """Missing coverage evidence + - seq_id + - start start position in reference sequence fragment. + - end end position in reference sequence of region. + - start_range number of bases to offset *after* the **start position** to define the upper limit of the range where the start of a deletion could be. + - end_range number of bases to offset *before* the **end position** to define the lower limit of the range where the start of a deletion could be. + """ + class DataItem(NamedTuple): seq_id: str; start: int; end: int; start_range: int; end_range: int + dataitem: DataItem +class JC(RecordEvidence): + """New junction evidence + - side_1_seq_id id of reference sequence fragment containing side 1 of the junction. + - side_1_position position of side 1 at the junction boundary. + - side_1_strand direction that side 1 continues matching the reference sequence + - side_2_seq_id id of reference sequence fragment containing side 2 of the junction. + - side_2_position position of side 2 at the junction boundary. + - side_2_strand direction that side 2 continues matching the reference sequence. + - overlap Number of bases that the two sides of the new junction have in common. + """ + class DataItem(NamedTuple): side_1_seq_id: str; side_1_position: int; side_1_strand: SeqStrand; side_2_seq_id: str; side_2_position: int; side_2_strand: SeqStrand; overlap: int + dataitem: DataItem +class CN(RecordEvidence): + """Copy number variation evidence + - seq_id + - start start position in reference sequence fragment. + - end end position in reference sequence of region. + - copy_number number of copies of the region in the reference genome. + """ + class DataItem(NamedTuple): seq_id: str; start: int; end: int; copy_number: int + dataitem: DataItem +class UN(RecordEvidence): + """Unknown base evidence + - seq_id + - start start position in reference sequence of region. + - end end position in reference sequence of region. + """ + class DataItem(NamedTuple): seq_id: str; start: int; end: int + dataitem: DataItem + + +class CURA(RecordValidation): + class DataItem(NamedTuple): expert: str + dataitem: DataItem +class FPOS(RecordValidation): + class DataItem(NamedTuple): expert: str + dataitem: DataItem +class PHYL(RecordValidation): + class DataItem(NamedTuple): gd: str + dataitem: DataItem +class TSEQ(RecordValidation): + class DataItem(NamedTuple): seq_id: str; primer1_start: int; primer1_end: int; primer2_start: int; primer2_end: int + dataitem: DataItem +class PFLP(RecordValidation): + class DataItem(NamedTuple): seq_id: str; primer1_start: int; primer1_end: int; primer2_start: int; primer2_end: int + dataitem: DataItem +class RFLP(RecordValidation): + class DataItem(NamedTuple): seq_id: str; primer1_start: int; primer1_end: int; primer2_start: int; primer2_end: int; enzyme: str + dataitem: DataItem +class PFGE(RecordValidation): + class DataItem(NamedTuple): seq_id: str; restriction_enzyme: str + dataitem: DataItem +class NOTE(RecordValidation): + class DataItem(NamedTuple): note: str + dataitem: DataItem + + +# fmt: on +class RecordEnum(Enum): + # fmt: off + SNP = SNP; SUB = SUB; DEL = DEL; INS = INS; MOB = MOB; AMP = AMP; CON = CON; INV = INV + RA = RA; MC = MC; JC = JC; CN = CN; UN = UN + CURA = CURA; FPOS = FPOS; PHYL = PHYL; TSEQ = TSEQ; PFLP = PFLP; RFLP = RFLP; PFGE = PFGE; NOTE = NOTE + # fmt: on + + @classmethod + def parse( + cls, + record_type: str, + evidence_id: "str|int", + document=None, + parent_ids: "str|list[int]|None" = ".", + **kwargs, + ): + """ + Record("SNP", 1, parent_ids=[23423], new_seq="A", seq_id="NC_000913", position=223, gene_name="mhpE") + => + SNP("1", "23423", "NC_000913\t223\tA\tgene_name=mhpE") + """ + if isinstance(parent_ids, list): + parent_ids = ",".join(str(i) for i in parent_ids) + RE = cls[record_type].value + pre = "\t".join((f"{kwargs[k]}" for k in RE.DataItem._fields)) + extra = "\t".join( + f"{k}={v}" for k, v in kwargs.items() if k not in RE.DataItem._fields + ) + return RE(evidence_id, parent_ids or "", f"{pre}\t{extra}", document=document) + + +Record = RecordEnum.parse + + +DATA2RECORD: dict[str, list[str]] = {} +for rtype, dtype in RecordMeta.types.items(): + DATA2RECORD.setdefault(dtype, []).append(rtype) +del rtype, dtype + + +class RecordCollection: + def __init__(self): + self.SNP: list[SNP] = [] + self.SUB: list[SUB] = [] + self.DEL: list[DEL] = [] + self.INS: list[INS] = [] + self.MOB: list[MOB] = [] + self.AMP: list[AMP] = [] + self.CON: list[CON] = [] + self.INV: list[INV] = [] + self.RA: list[RA] = [] + self.MC: list[MC] = [] + self.JC: list[JC] = [] + self.CN: list[CN] = [] + self.UN: list[UN] = [] + self.CURA: list[CURA] = [] + self.FPOS: list[FPOS] = [] + self.PHYL: list[PHYL] = [] + self.TSEQ: list[TSEQ] = [] + self.PFLP: list[PFLP] = [] + self.RFLP: list[RFLP] = [] + self.PFGE: list[PFGE] = [] + self.NOTE: list[NOTE] = [] + + self.index: dict["str|int|float", _Record] = {} + self.unindex: dict[str, list[_Record]] = {} + + @classmethod + def new(cls): + return cls() # type: ignore + + def set(self, record: _Record): + assert record.type in RecordMeta.types + getattr(self, record.type).append(record) + if record.id in self.index: + self.unindex.setdefault(record.id, []).append(self.index[record.id]) + if record.id in self.unindex or _convert_value(record.id) is None: + self.unindex[record.id].append(record) + else: + self.index[record.id] = record + + def parents_of(self, record: "_Record|str|float|str"): + if not isinstance(record, _Record): + record = self[record] + assert isinstance(record, _Record) + valid_pids = [ + pid for pid in record.parent_ids if _convert_value(record.id) is not None + ] + return [ + *(self.index[pid] for pid in valid_pids if pid in self.index), + *(i for pid in valid_pids for i in self.unindex.get(pid, ())), + ] + + @property + def mutation(self) -> Iterable[RecordMutation]: + return (i for rtype in DATA2RECORD["mutation"] for i in getattr(self, rtype)) + + @property + def evidence(self) -> Iterable[RecordEvidence]: + return (i for rtype in DATA2RECORD["evidence"] for i in getattr(self, rtype)) + + @property + def validation(self) -> Iterable[RecordValidation]: + return (i for rtype in DATA2RECORD["validation"] for i in getattr(self, rtype)) + + def __getitem__(self, item): + try: + return self.index[float(item)] + except Exception: + pass + return self.index[item] + + def __len__(self): + return sum(len(getattr(self, rtype)) for rtype in RecordMeta.types) + + def __iter__(self): + return (i for j in (self.mutation, self.evidence, self.validation) for i in j) + + def __str__(self): + return "\n".join( + [ + "MUTATION:", + "\n".join([str(x) for x in self.mutation]), + "EVIDENCE:", + "\n".join([str(x) for x in self.evidence]), + "VALIDATION:", + "\n".join([str(x) for x in self.validation]), + ] + ) + + def remove( + self, + *conds: "str|Condition", + mut_type: "RecordEnum|None|str" = None, + **kconds: "str|Condition", + ): + """ + Remove mutations that satisfy the given conditions. Implementation of + gdtools REMOVE for genomediff objects. + + Input: a variable number of conditions, e.g. 'gene_name==rrlA','frequency>=0.9'. + If mut_type is specified, only that mutation type will be removed. + Output: self.mutations is updated, with mutations satisfying the conditions + having been removed. + """ + if isinstance(mut_type, RecordEnum): + rec: RecordMutation + updated_mutations = [] + for rec in getattr(self, mut_type.name): + if rec.satisfy(*conds, **kconds): + if (_rec := self.index.pop(rec.id, None)) is not None: + self.unindex.setdefault(rec.id, []).append(_rec) + else: + updated_mutations.append(rec) + setattr(self, mut_type.name, updated_mutations) + elif mut_type is None: + for mut_type in DATA2RECORD["mutation"]: + self.remove(*conds, mut_type=RecordEnum[mut_type], **kconds) + else: + mut_type = RecordEnum[mut_type] + self.remove(*conds, mut_type=mut_type, **kconds) + + def query( + self, + *conds: "str|Condition", + mut_type: "RecordEnum|None|str" = None, + **kconds: "str|Condition", + ): + if isinstance(mut_type, RecordEnum): + rec: RecordMutation + for rec in getattr(self, mut_type.name): + if rec.satisfy(*conds, **kconds): + yield rec + elif mut_type is None: + for mut_type in DATA2RECORD["mutation"]: + yield from self.query(*conds, mut_type=RecordEnum[mut_type], **kconds) + else: + mut_type = RecordEnum[mut_type] + yield from self.query(*conds, mut_type=mut_type, **kconds) diff --git a/tests.py b/tests.py index 95f5ca0..dd7ecef 100644 --- a/tests.py +++ b/tests.py @@ -1,5 +1,20 @@ -from unittest import TestCase, main +# -*- coding: utf-8 -*- +""" +* @Date: 2025-01-11 11:57:00 +* @LastEditors: hwrn hwrn.aou@sjtu.edu.cn +* @LastEditTime: 2025-02-12 11:36:43 +* @FilePath: /pymummer/tests/test_genomediff.py +* @Description: + +test: + PYTHONPATH=. python tests/test_genomediff.py +or: + python -m pytest +""" +# """ + from io import StringIO +from unittest import TestCase, main from genomediff import Metadata, GenomeDiff from genomediff.parser import GenomeDiffParser @@ -8,145 +23,215 @@ class ParserTestCase(TestCase): def test_parse(self): - file = StringIO(""" -#=GENOME_DIFF 1.0 -#=AUTHOR test -SNP 1 23423 NC_000913 223 A gene_name=mhpE -RA 2 NC_000913 223 0 G A frequency=0.1366 - """.strip()) + file = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "#=AUTHOR test", + "SNP\t1\t23423\tNC_000913\t223\tA\tgene_name=mhpE", + "RA\t2\t\tNC_000913\t223\t0\tG\tA\tfrequency=0.1366", + ] + ) + ) p = GenomeDiffParser(fsock=file) - self.assertEqual([ - Metadata('GENOME_DIFF', '1.0'), - Metadata('AUTHOR', 'test'), - Record('SNP', 1, parent_ids=[23423], new_seq='A', seq_id='NC_000913', position=223, gene_name='mhpE'), - Record('RA', 2, new_base='A', frequency=0.1366, position=223, seq_id='NC_000913', - insert_position=0, - ref_base='G')], - list(p) + # fmt: off + + self.assertEqual( + [ + Metadata("GENOME_DIFF", "1.0"), + Metadata("AUTHOR", "test"), + Record("SNP", 1, parent_ids=[23423], new_seq="A", seq_id="NC_000913", position=223, gene_name="mhpE"), + Record("RA", 2, new_base="A", frequency=0.1366, position=223, seq_id="NC_000913", insert_position=0, ref_base="G"), + ], + list(p), ) + # fmt: on def test_parse_dot_missing_parent_ids(self): - file = StringIO(""" -#=GENOME_DIFF 1.0 -#=AUTHOR test -SNP 1 23423 NC_000913 223 A gene_name=mhpE -RA 2 . NC_000913 223 0 G A frequency=0.1366 - """.strip()) + file = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "#=AUTHOR test", + "SNP\t1\t23423\tNC_000913\t223\tA\tgene_name=mhpE", + "RA\t2\t.\tNC_000913\t223\t0\tG\tA\tfrequency=0.1366", + ] + ) + ) p = GenomeDiffParser(fsock=file) - self.assertEqual([ - Metadata('GENOME_DIFF', '1.0'), - Metadata('AUTHOR', 'test'), - Record('SNP', 1, parent_ids=[23423], new_seq='A', seq_id='NC_000913', position=223, gene_name='mhpE'), - Record('RA', 2, new_base='A', frequency=0.1366, position=223, seq_id='NC_000913', - insert_position=0, - ref_base='G')], - list(p) + # fmt: off + self.assertEqual( + [ + Metadata('GENOME_DIFF', '1.0'), + Metadata('AUTHOR', 'test'), + Record('SNP', 1, parent_ids=[23423], new_seq='A', seq_id='NC_000913', position=223, gene_name='mhpE'), + Record('RA', 2, new_base='A', frequency=0.1366, position=223, seq_id='NC_000913', insert_position=0, ref_base='G') + ], + list(p) ) + # fmt: on class GenomeDiffTestCase(TestCase): def test_document(self): - file = StringIO(""" -#=GENOME_DIFF 1.0 -#=AUTHOR test -SNP 1 23423 NC_000913 223 A -RA 2 NC_000913 223 0 G A - """.strip()) - + file = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "#=AUTHOR test", + "SNP\t1\t23423\tNC_000913\t223\tA", + "RA\t2\t\tNC_000913\t223\t0\tG\tA", + ] + ) + ) document = GenomeDiff.read(file) + # fmt: off + self.assertEqual({'AUTHOR': ['test'], 'GENOME_DIFF': ['1.0']}, document.metadata._dict) self.assertEqual({'AUTHOR': 'test', 'GENOME_DIFF': '1.0'}, document.metadata) - snp_record = Record('SNP', 1, document, [23423], seq_id='NC_000913', new_seq='A', position=223) - ra_record = Record('RA', 2, document, None, position=223, seq_id='NC_000913', insert_position=0, new_base='A', - ref_base='G') + snp_record = Record('SNP', 1, [23423], seq_id='NC_000913', new_seq='A', position=223) + ra_record = Record('RA', 2, None, position=223, seq_id='NC_000913', insert_position=0, new_base='A', ref_base='G') + # fmt: on self.assertEqual([snp_record], document.mutations) self.assertEqual([ra_record], document.evidence) self.assertEqual(snp_record, document[1]) self.assertEqual(ra_record, document[2]) + def test_query(self): + file = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "#=AUTHOR test", + "SNP\t1\t23423\tNC_000913\t223\tA", + "RA\t2\t\tNC_000913\t223\t0\tG\tA", + ] + ) + ) + document = GenomeDiff.read(file) + + snp_record = Record( + "SNP", 1, [23423], seq_id="NC_000913", new_seq="A", position=223 + ) + ra_record = Record( + "RA", + 2, + None, + position=223, + seq_id="NC_000913", + insert_position=0, + new_base="A", + ref_base="G", + ) + + self.assertEqual([snp_record], list(document.records.query(type="SNP"))) + self.assertEqual([snp_record], list(document.records.query(mut_type="SNP"))) + self.assertEqual([snp_record], list(document.records.query(type="==SNP"))) + self.assertEqual([snp_record], list(document.records.query("type==SNP"))) + self.assertEqual( + [ra_record], list(document.records.query("position>0", mut_type="RA")) + ) + class RecordTestCase(TestCase): def test_simple(self): + # fmt: off snp_record = Record('SNP', 1, parent_ids=[23423], seq_id='NC_000913', new_seq='A', position=223, test='more') self.assertEqual('SNP', snp_record.type) self.assertEqual(1, snp_record.id) self.assertEqual('A', snp_record.new_seq) self.assertEqual('more', snp_record.test) + # fmt: on class ParentResolveTestCase(TestCase): def test_resolve(self): - file = StringIO(""" -#=GENOME_DIFF 1.0 -#=AUTHOR test -SNP 1 2 NC_000913 223 A -RA 2 NC_000913 223 0 G A - """.strip()) + file = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "#=AUTHOR test", + "SNP\t1\t2\tNC_000913\t223\tA\tgene_name=mhpE", + "RA\t2\t\tNC_000913\t223\t0\tG\tA\tfrequency=0.1366", + ] + ) + ) document = GenomeDiff.read(file) + self.assertEqual(document.records.parents_of(1), [document[2]]) self.assertEqual(document[1].parents, [document[2]]) + class RecordComparisonTestCase(TestCase): def test_cmp1(self): - file1 = StringIO(""" -#=GENOME_DIFF 1.0 -#=CREATED 20:02:17 23 Jan 2019 -#=PROGRAM breseq 0.33.2 -#=COMMAND breseq -r LCA.gff3 sequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R1.fastq.gz sequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R2.fastq.gz sequence-data/ZDBp889_reads.fastq -o consensus/ZDBp889 -#=REFSEQ LCA.gff3 -#=READSEQ sequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R1.fastq.gz -#=READSEQ sequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R2.fastq.gz -#=READSEQ sequence-data/ZDBp889_reads.fastq -#=CONVERTED-BASES 644779377 -#=CONVERTED-READS 14448149 -#=INPUT-BASES 645034321 -#=INPUT-READS 14455411 -#=MAPPED-BASES 602854657 -#=MAPPED-READS 13788351 -SNP 1 34 REL606 72313 C - """.strip()) - + file1 = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "#=CREATED\t20:02:17 23 Jan 2019", + "#=PROGRAM\tbreseq 0.33.2", + "#=COMMAND\tbreseq -r LCA.gff3 sequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R1.fastq.gz sequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R2.fastq.gz sequence-data/ZDBp889_reads.fastq -o consensus/ZDBp889", + "#=REFSEQ\tLCA.gff3", + "#=READSEQ\tsequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R1.fastq.gz", + "#=READSEQ\tsequence-data/DM0 evolved re-runs (Rohan)/ZDBp889_R2.fastq.gz", + "#=READSEQ\tsequence-data/ZDBp889_reads.fastq", + "#=CONVERTED-BASES\t644779377", + "#=CONVERTED-READS\t14448149", + "#=INPUT-BASES\t645034321", + "#=INPUT-READS\t14455411", + "#=MAPPED-BASES\t602854657", + "#=MAPPED-READS\t13788351", + "SNP\t1\t34\tREL606\t72313\tC", + ] + ) + ) document1 = GenomeDiff.read(file1) - - file2 = StringIO(""" -#=GENOME_DIFF 1.0 -#=CREATED 16:49:49 23 Jan 2019 -#=PROGRAM breseq 0.33.2 -#=COMMAND breseq -r LCA.gff3 sequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R1.fastq.gz sequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R2.fastq.gz -o consensus/ZDB67 -#=REFSEQ LCA.gff3 -#=READSEQ sequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R1.fastq.gz -#=READSEQ sequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R2.fastq.gz -#=CONVERTED-BASES 114566968 -#=CONVERTED-READS 419781 -#=INPUT-BASES 114567554 -#=INPUT-READS 419783 -#=MAPPED-BASES 92472620 -#=MAPPED-READS 339813 -SNP 1 12 REL606 72313 C - """.strip()) - + file2 = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "#=CREATED\t16:49:49 23 Jan 2019", + "#=PROGRAM\tbreseq 0.33.2", + "#=COMMAND\tbreseq -r LCA.gff3 sequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R1.fastq.gz sequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R2.fastq.gz -o consensus/ZDB67", + "#=REFSEQ\tLCA.gff3", + "#=READSEQ\tsequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R1.fastq.gz", + "#=READSEQ\tsequence-data/DM0 evolved re-runs (Rohan)/ZDB67_R2.fastq.gz", + "#=CONVERTED-BASES\t114566968", + "#=CONVERTED-READS\t419781", + "#=INPUT-BASES\t114567554", + "#=INPUT-READS\t419783", + "#=MAPPED-BASES\t92472620", + "#=MAPPED-READS\t339813", + "SNP\t1\t12\tREL606\t72313\tC", + ] + ) + ) document2 = GenomeDiff.read(file2) - self.assertEqual(document1.mutations,document2.mutations) - + self.assertEqual(document1.mutations, document2.mutations) def test_cmp2(self): - file1 = StringIO(""" -#=GENOME_DIFF 1.0 -SNP 1 12 REL606 72313 C aa_new_seq=G aa_position=92 aa_ref_seq=D codon_new_seq=GGC codon_number=92 codon_position=2 codon_ref_seq=GAC gene_name=araA gene_position=275 gene_product=L-arabinose isomerase gene_strand=< genes_overlapping=araA locus_tag=ECB_00064 locus_tags_overlapping=ECB_00064 mutation_category=snp_nonsynonymous position_end=72313 position_start=72313 snp_type=nonsynonymous transl_table=11 - """.strip()) - + file1 = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "SNP\t1\t12\tREL606\t72313\tC\taa_new_seq=G\taa_position=92\taa_ref_seq=D\tcodon_new_seq=GGC\tcodon_number=92\tcodon_position=2\tcodon_ref_seq=GAC\tgene_name=araA\tgene_position=275\tgene_product=L-arabinose isomerase\tgene_strand=<\tgenes_overlapping=araA\tlocus_tag=ECB_00064\tlocus_tags_overlapping=ECB_00064\tmutation_category=snp_nonsynonymous\tposition_end=72313\tposition_start=72313\tsnp_type=nonsynonymous\ttransl_table=11", + ] + ) + ) document1 = GenomeDiff.read(file1) - - file2 = StringIO(""" -#=GENOME_DIFF 1.0 -SNP 1 34 REL606 72313 C aa_new_seq=G aa_position=92 aa_ref_seq=D codon_new_seq=GGC codon_number=92 codon_position=2 codon_ref_seq=GAC gene_name=araA gene_position=275 gene_product=L-arabinose isomerase gene_strand=< genes_overlapping=araA locus_tag=ECB_00064 locus_tags_overlapping=ECB_00064 mutation_category=snp_nonsynonymous position_end=72313 position_start=72313 snp_type=nonsynonymous transl_table=11 - """.strip()) - + file2 = StringIO( + "\n".join( + [ + "#=GENOME_DIFF\t1.0", + "SNP\t1\t34\tREL606\t72313\tC\taa_new_seq=G\taa_position=92\taa_ref_seq=D\tcodon_new_seq=GGC\tcodon_number=92\tcodon_position=2\tcodon_ref_seq=GAC\tgene_name=araA\tgene_position=275\tgene_product=L-arabinose isomerase\tgene_strand=<\tgenes_overlapping=araA\tlocus_tag=ECB_00064\tlocus_tags_overlapping=ECB_00064\tmutation_category=snp_nonsynonymous\tposition_end=72313\tposition_start=72313\tsnp_type=nonsynonymous\ttransl_table=11", + ] + ) + ) document2 = GenomeDiff.read(file2) - self.assertEqual(document1.mutations,document2.mutations) + self.assertEqual(document1.mutations, document2.mutations) + - -if __name__ == '__main__': +if __name__ == "__main__": main()