forked from TensorDuck/pyFrustration
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
220 lines (180 loc) · 7.27 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import pyrosetta as pyr
import numpy as np
import os
import sys
import numpy.random as random
from mpi4py import MPI
import time
import math
from pyrosetta import Pose
#from pyrosetta import MPIJobDistributor
from pyrosetta.rosetta.protocols.relax import ClassicRelax
from pyrosetta.toolbox import cleanATOM
from pyrosetta.toolbox import get_secstruct
from pyrosetta.toolbox import mutate_residue
from pyrosetta import teaching as pyrt
from pyrosetta import standard_packer_task
from pyrosetta.rosetta.protocols.simple_moves import PackRotamersMover
import mdtraj as md
# Disable Printing:
def block_print():
sys.stdout = open(os.devnull, "w")
# Restore Printing:
def enable_print():
sys.stdout = sys.__stdout__
def clean_pdb_files(decoy_dir, file_list):
cwd = os.getcwd()
os.chdir(decoy_dir)
for pdb_file in file_list:
cleanATOM(pdb_file) #outputs file_name.clean.pdb for file_name.pdb
os.chdir(cwd)
def compute_average_and_sd(list_of_params):
average_params = None
n_threads = len(list_of_params)
if n_threads == 0:
print "Invalid number of values, number of values must be > 0"
collected_params = []
for mparams in list_of_params:
if average_params is None:
average_params = np.copy(mparams)
else:
average_params += mparams
average_params /= float(n_threads)
params_sd = np.zeros(np.shape(average_params))
for mparams in list_of_params:
diff = (mparams - average_params) ** 2
params_sd += diff
params_sd = np.sqrt(params_sd / float(n_threads))
return average_params, params_sd
def compute_pairwise(pose, scorefxn, order, weights, nresidues=35, use_contacts=None):
total_E = scorefxn(pose)
pair_E = np.zeros((nresidues,nresidues))
if use_contacts is None:
these_contacts = []
for idx in range(1, nresidues+1):
for jdx in range(idx+1, nresidues+1):
these_contacts.append([idx, jdx])
else:
these_contacts = use_contacts
for contact in these_contacts:
idx = contact[0]
jdx = contact[1]
emap = pyrt.EMapVector()
scorefxn.eval_ci_2b(pose.residue(idx), pose.residue(jdx), pose, emap)
this_E = 0.
for thing,wt in zip(order,weights):
this_E += emap[thing] * wt
pair_E[idx-1, jdx-1] = this_E
pair_E[jdx-1, idx-1] = this_E
#assert (total_E - np.sum(pair_E)) < 0.01
return pair_E
def compute_pairwise_allinteractions(pose, scorefxn, order, weights, nresidues=35, use_contacts=None):
total_E = scorefxn(pose)
pair_E = np.zeros((nresidues,nresidues))
if use_contacts is None:
these_contacts = []
for idx in range(1, nresidues+1):
for jdx in range(idx+1, nresidues+1):
these_contacts.append([idx, jdx])
else:
these_contacts = use_contacts
for contact in these_contacts:
idx = contact[0]
jdx = contact[1]
this_E = 0.
for i_count in range(1, nresidues+1):
if (i_count != idx) and (i_count != jdx):
emap = pyrt.EMapVector()
scorefxn.eval_ci_2b(pose.residue(idx), pose.residue(i_count), pose, emap)
for thing,wt in zip(order,weights):
this_E += emap[thing] * wt
emap = pyrt.EMapVector()
scorefxn.eval_ci_2b(pose.residue(jdx), pose.residue(i_count), pose, emap)
for thing,wt in zip(order,weights):
this_E += emap[thing] * wt
emap = pyrt.EMapVector()
scorefxn.eval_ci_2b(pose.residue(idx), pose.residue(jdx), pose, emap)
for thing,wt in zip(order,weights):
this_E += emap[thing] * wt
pair_E[idx-1, jdx-1] = this_E
pair_E[jdx-1, idx-1] = this_E
#assert (total_E - np.sum(pair_E)) < 0.01
return pair_E
def compute_standard_rosetta_energy(pose):
scorefxn = pyrt.get_fa_scorefxn()
total_e = scorefxn(pose)
return total_e
def get_possible_residues(pose):
all_residues = ["G", "A", "V", "L", "I", "M", "F", "W", "P", "S", "T", "C", "Y", "N", "Q", "D", "E", "K", "R", "H"]
sequence = pose.sequence()
possible_residues = []
for thing in sequence:
possible_residues.append(thing)
return possible_residues
def determine_close_residues_from_file(native_file, probability_cutoff=0.8, radius_cutoff=0.5):
traj = md.load(native_file)
use_pairs, use_pairs_zero, use_score = determine_close_residues(traj, probability_cutoff=probability_cutoff, radius_cutoff=radius_cutoff)
print "In file %s, found %d close residues" % (native_file, np.shape(use_pairs)[0])
return use_pairs, use_pairs_zero, use_score
def determine_close_residues(traj, probability_cutoff=0.8, radius_cutoff=0.5):
n_frames = traj.n_frames
top = traj.top
collected_pairs = []
for i in range(top.n_residues):
for j in range(i+2, top.n_residues):
collected_pairs.append([i,j])
collected_pairs = np.array(collected_pairs).astype(int)
distances, residue_pairs = md.compute_contacts(traj, contacts=collected_pairs)
#distances, residue_pairs = md.compute_contacts(traj)
n_contacts = np.shape(residue_pairs)[0]
assert np.shape(distances)[1] == n_contacts
count_matrix = np.zeros(np.shape(distances)).astype(float)
count_matrix[np.where(distances <= radius_cutoff)] = 1.
score_array = np.sum(count_matrix.astype(int), axis=0)
max_indices = np.max(residue_pairs)
score_matrix = np.zeros((max_indices+1,max_indices+1))
for i_con in range(n_contacts):
idx = residue_pairs[i_con, 0]
jdx = residue_pairs[i_con, 1]
score_matrix[idx, jdx] = score_array[i_con]
score_matrix[jdx, idx] = score_array[i_con]
score_by_residue = np.sum(score_matrix, axis=0)
probability_matrix = np.sum(count_matrix, axis=0) / float(n_frames)
assert np.shape(probability_matrix)[0] == n_contacts
use_pairs = []
use_pairs_zero = []
use_score = []
for i_con in range(n_contacts):
if probability_matrix[i_con] > probability_cutoff:
indices = residue_pairs[i_con, :]
idx = indices[0]
jdx = indices[1]
use_pairs_zero.append(indices)
use_pairs.append(indices + 1) # plus 1 converts to residue indices for rosetta
use_score.append(score_by_residue[idx] + score_by_residue[jdx])
return use_pairs, use_pairs_zero, use_score
def get_mpi_jobs(njobs, rank, size):
job_numbers = range(rank, njobs, size)
return job_numbers
def parse_mutations_file(file_name):
f = open(file_name, "r")
mutations_list = []
deletions_list = []
section_parsing = None
for line in f:
stuff = line.strip().split()
if line[0] == "#":
if stuff[0] == "#mutation":
section_parsing = "mutations"
elif stuff[0] == "#deletion":
section_parsing = "deletions"
else:
section_parsing = None
else:
if section_parsing is None:
pass
elif section_parsing == "mutations":
mutations_list.append([int(stuff[0]), stuff[1]])
elif section_parsing == "deletions":
deletions_list.append([int(stuff[0]), int(stuff[1])])
return mutations_list, deletions_list