Skip to content

Commit

Permalink
Merge pull request #67 from UCLOrengoGroup/process-pdbdir-globularity
Browse files Browse the repository at this point in the history
Process globularity from directory of PDB files (without chopping)
  • Loading branch information
sillitoe authored Nov 22, 2023
2 parents 418b900 + 227ab42 commit d85a722
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 69 deletions.
173 changes: 130 additions & 43 deletions cath_alphaflow/commands/measure_globularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cath_alphaflow.io_utils import get_pdb_structure
from cath_alphaflow.models.domains import GeneralDomainID
from cath_alphaflow.models.domains import Chopping
from cath_alphaflow.models.domains import Segment

from cath_alphaflow.constants import DEFAULT_GLOB_DISTANCE, DEFAULT_GLOB_VOLUME

Expand All @@ -25,7 +26,7 @@
@click.option(
"--consensus_domain_list",
type=click.File("rt"),
required=True,
required=False,
help="Input: CSV file containing consensus domains",
)
@click.option(
Expand Down Expand Up @@ -71,24 +72,12 @@ def measure_globularity(
):
"Checks the globularity of the AF domain"

consensus_domain_list_reader = get_csv_dictreader(
consensus_domain_list,
fieldnames=[
"domain_id",
"domain_md5",
"consensus_level",
"chopping",
"domain_nres",
"no_segments",
"plddt",
"total_ss",
"no_helix",
"no_strand",
"no_helix+strand",
"no_turn",
"proteome_id",
],
)
if consensus_domain_list:
general_domain_provider = yield_domain_from_consensus_domain_list(
consensus_domain_list
)
else:
general_domain_provider = yield_domain_from_pdbdir(pdb_dir)

globularity_writer = get_csv_dictwriter(
domain_globularity,
Expand All @@ -107,12 +96,7 @@ def measure_globularity(
f"out_file={domain_globularity.name} ) ..."
)

for domain_row in consensus_domain_list_reader:
domain_id = GeneralDomainID(
raw_id=domain_row["domain_id"],
chopping=Chopping.from_str(domain_row["chopping"]),
)

for domain_id in general_domain_provider():
LOG.debug(f"Working on: {domain_id} ...")

model_structure = get_pdb_structure(domain_id, pdb_dir, chains_are_gzipped)
Expand All @@ -121,7 +105,7 @@ def measure_globularity(
check_domain_chopping_matches_model_residues(domain_id, model_structure)
except Exception as e:
LOG.warning(
f"residue mismatch for {domain_id} (model={model_structure}): {e.message}"
f"residue mismatch for {domain_id} (model={model_structure}): {str(e)}"
)

domain_packing_density = calculate_packing_density(
Expand Down Expand Up @@ -167,10 +151,14 @@ def check_domain_chopping_matches_model_residues(

def calculate_packing_density(
domain_id: GeneralDomainID,
chain_structure: Structure,
model_structure: Structure,
distance_cutoff: int,
) -> float:
domain_residue_numbers = domain_id.get_domain_residues_from_numeric_chopping()
chain_residues = list(model_structure.get_chains())[0].get_residues()
if domain_id.chopping:
target_residues = domain_id.chopping.filter_bio_residues(chain_residues)
else:
target_residues = chain_residues

# List of hydrophobic residues
hydrophobic_list = [
Expand All @@ -187,28 +175,26 @@ def calculate_packing_density(

neighbor_list_protein = []
# Get the number of nearby residues for all hydrophobic residues in the domain
atom_list = [atom for atom in chain_structure.get_atoms()]
for residue_number in domain_residue_numbers:
residue = chain_structure[0]["A"][residue_number]
if residue.get_resname() in hydrophobic_list and not residue.is_disordered():
center_atoms = Selection.unfold_entities(residue, "A")
atom_list = [atom for atom in model_structure.get_atoms()]
for res in target_residues:
if res.get_resname() in hydrophobic_list and not res.is_disordered():
center_atoms = Selection.unfold_entities(res, "A")
ns = NeighborSearch(atom_list)

nearby_residues = {
residue
_res
for center_atom in center_atoms
for residue in ns.search(center_atom.coord, distance_cutoff, "R")
for _res in ns.search(center_atom.coord, distance_cutoff, "R")
}
neighbor_list_protein.append(len(nearby_residues))

return round(np.mean(neighbor_list_protein), 3)


# Function to get the approximated Volume of the domain
def get_volume(structure, domain_residue_numbers, volume_resolution):
def get_volume(residues, volume_resolution):
reduced_coords = list()
for residue_number in domain_residue_numbers:
residue = structure[0]["A"][residue_number]
for residue in residues:
for atm in residue.get_atoms():
x_reduced = int(atm.coord.tolist()[0] / volume_resolution)
y_reduced = int(atm.coord.tolist()[1] / volume_resolution)
Expand All @@ -227,14 +213,16 @@ def calculate_normed_radius_of_gyration(
model_structure: Structure,
volume_resolution: int,
) -> float:
domain_residue_numbers = domain_id.get_domain_residues_from_numeric_chopping()
chain_residues = list(model_structure.get_chains())[0].get_residues()
if domain_id.chopping:
target_residues = domain_id.chopping.filter_bio_residues(chain_residues)
else:
target_residues = chain_residues

coords = []
masses = []
# Fill lists for the coords and masses for all atoms in the domain
for residue_number in domain_residue_numbers:
residue = model_structure[0]["A"][residue_number]

for residue in target_residues:
for atom in residue:
coords.append(atom.coord.tolist())
if atom.element == "C":
Expand Down Expand Up @@ -262,9 +250,108 @@ def calculate_normed_radius_of_gyration(
radius_gyration = np.sqrt(r_tmp / total_mass - m_tmp)

# Calculate the radius of gyration for a sphere with the same volume as the protein
volume = get_volume(model_structure, domain_residue_numbers, volume_resolution)
volume = get_volume(target_residues, volume_resolution)
radius = np.cbrt(volume) * 3 / 4 * np.pi
radius_gyration_sphere = np.sqrt(3 / 5 * radius**2)

# Return normed radius of gyration
return round(radius_gyration / radius_gyration_sphere, 3)


def guess_chopping_from_pdb_file(
pdbfile, target_chain_id=None, assume_all_atom_breaks_are_segments=True
) -> Chopping:
"""
Reverse engineer a `Chopping` object from a PDB file
'Real' PDB files can have weird residue numberings, so it is not possible to
reliably know whether a PDB file has been chopped by an external process
(i.e. a CATH domain) or whether the numbering in the original PDB is just weird.
The best we can do is guess, i.e. check for cases where the sequential atom number
is not contiguous.
If `assume_all_atom_breaks_are_segments` is turned off then we just create a chopping
with a single segment that spans the entire PDB file.
"""

pdbpath = Path(str(pdbfile))
segments = []
start_res = None
end_res = None
last_atom_num = None
last_res_label = None
with pdbpath.open("rt") as fh:
for line in fh:
if not line.startswith("ATOM"):
continue

atom_num = int(line[6:11])
chain_id = line[21]
res_label = line[22:27].strip() # includes the (optional) insert code

# if we haven't specified a chain, then we take the first one we see
if not target_chain_id:
target_chain_id = chain_id

if not last_atom_num:
last_atom_num = atom_num
if not last_res_label:
last_res_label = res_label

if chain_id != target_chain_id:
continue

if not start_res:
start_res = res_label
end_res = res_label

if assume_all_atom_breaks_are_segments:
if res_label != last_res_label and atom_num != last_atom_num + 1:
segments.append(Segment(start=start_res, end=last_res_label))
start_res = None
end_res = None

last_res_label = res_label
last_atom_num = atom_num

if start_res and end_res:
segments.append(Segment(start=start_res, end=end_res))

return Chopping(segments=segments)


def yield_domain_from_pdbdir(pdbdir, pdb_suffix=".pdb") -> GeneralDomainID:
pdbdir = Path(str(pdbdir))
for pdbpath in pdbdir.iterdir():
if not str(pdbpath).endswith(pdb_suffix):
continue
model_id = pdbpath.stem
yield GeneralDomainID(domain_id=model_id, chopping=None)


def yield_domain_from_consensus_domain_list(consensus_domain_list) -> GeneralDomainID:
consensus_domain_list_reader = get_csv_dictreader(
consensus_domain_list,
fieldnames=[
"domain_id",
"domain_md5",
"consensus_level",
"chopping",
"domain_nres",
"no_segments",
"plddt",
"total_ss",
"no_helix",
"no_strand",
"no_helix+strand",
"no_turn",
"proteome_id",
],
)

for domain_row in consensus_domain_list_reader:
domain = GeneralDomainID(
model_id=domain_row["domain_id"],
chopping=Chopping.from_str(domain_row["chopping"]),
)
yield domain
10 changes: 7 additions & 3 deletions cath_alphaflow/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,22 @@ def chunked_iterable(iterable, *, chunk_size):


def get_pdb_structure(
model_id, chain_pdb_dir, chains_are_gzipped=False, model_filename=None
model_id,
chain_pdb_dir,
chains_are_gzipped=False,
model_filename=None,
pdb_suffix=".pdb",
) -> Structure:
"""Return a Bio.PDB.Structure for a given domain (model_id / chopping)"""

# create default filename
if model_filename is None:
model_filename = model_id.raw_id + pdb_suffix
if not chains_are_gzipped:
open_func = open
model_filename = model_id.raw_id + ".pdb"
else:
open_func = gzip.open
model_filename = model_id.raw_id + ".pdb.gz"
model_filename += ".gz"

pdb_path = Path(chain_pdb_dir, model_filename)

Expand Down
36 changes: 33 additions & 3 deletions cath_alphaflow/models/domains.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import re
from typing import List, Callable
from typing import List, Callable, Tuple
from dataclasses import dataclass, asdict
from pydantic import BaseModel as PyBaseModel, ConfigDict

Expand Down Expand Up @@ -159,6 +159,33 @@ def from_str(cls, chopping_str: str):
segs.append(seg)
return Chopping(segments=segs)

def filter_bio_residues(self, residues: List[Tuple[str, int, str]]):
"""
Filter a list of Bio.PDB residues to only include those that are within this chopping
"""
filtered_residues = []
segments = self.segments
segment_count = 0
current_segment = segments[segment_count]
in_segment = False
for res in residues:
res_id = res.id[1]
res_ins = res.id[2]
res_label = str(str(res_id) + res_ins).strip()
if not in_segment and str(res_label) == str(current_segment.start):
in_segment = True
if in_segment:
filtered_residues.append(res)
if in_segment and str(res_label) == str(current_segment.end):
in_segment = False
segment_count += 1
if segment_count < len(segments):
current_segment = segments[segment_count]
else:
# no more segments left
break
return filtered_residues

def to_str(self):
return "_".join([f"{seg.start}-{seg.end}" for seg in self.segments])

Expand Down Expand Up @@ -224,7 +251,7 @@ def deep_copy(self):
@dataclass
class GeneralDomainID:
raw_id: str
chopping: Chopping
chopping: Chopping = None
acc: str = None
version: str = None

Expand Down Expand Up @@ -257,7 +284,10 @@ def get_domain_residues_from_numeric_chopping(self) -> List[int]:
return domain_residues

def __str__(self):
return f"{self.raw_id}/{self.chopping.to_str()}"
if self.chopping:
return f"{self.raw_id}/{self.chopping.to_str()}"
else:
return f"{self.raw_id}"


@dataclass
Expand Down
Loading

0 comments on commit d85a722

Please sign in to comment.